aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-10-14 13:13:59 +0200
committerGitHub Enterprise <[email protected]>2025-10-14 13:13:59 +0200
commit9b7580230798d83d9bb36d40150913af69a13929 (patch)
tree73552ec1d3e9d955ce391cad894c637b74be91d4
parentmove all storage-related services into storage tree (#571) (diff)
downloadzen-9b7580230798d83d9bb36d40150913af69a13929.tar.xz
zen-9b7580230798d83d9bb36d40150913af69a13929.zip
refactor builds cmd part2 (#572)
* fix metadata info in filebuildstorage GetBuild * move MakeSafeAbsolutePathÍnPlace to filesystem.h/cpp * add BuildsOperationUploadFolder op moving code from builds_cmd.cpp
-rw-r--r--src/zen/cmds/builds_cmd.cpp2961
-rw-r--r--src/zencore/filesystem.cpp27
-rw-r--r--src/zencore/include/zencore/filesystem.h3
-rw-r--r--src/zenremotestore/builds/buildstorageoperations.cpp2429
-rw-r--r--src/zenremotestore/builds/filebuildstorage.cpp6
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h245
-rw-r--r--src/zenutil/commandlineoptions.cpp27
-rw-r--r--src/zenutil/include/zenutil/commandlineoptions.h10
8 files changed, 2982 insertions, 2726 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index 00bba6f4f..b60844ea1 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -263,24 +263,6 @@ namespace {
}
}
- static const size_t DefaultMaxBlockSize = 64u * 1024u * 1024u;
- static const size_t DefaultMaxChunksPerBlock = 4u * 1000u;
- static const size_t DefaultMaxChunkEmbedSize = 3u * 512u * 1024u;
-
- struct ChunksBlockParameters
- {
- size_t MaxBlockSize = DefaultMaxBlockSize;
- size_t MaxChunksPerBlock = DefaultMaxChunksPerBlock;
- size_t MaxChunkEmbedSize = DefaultMaxChunkEmbedSize;
- };
-
- const ChunksBlockParameters DefaultChunksBlockParams{
- .MaxBlockSize = 32u * 1024u * 1024u,
- .MaxChunksPerBlock = DefaultMaxChunksPerBlock,
- .MaxChunkEmbedSize = 2u * 1024u * 1024u // DefaultChunkedParams.MaxSize
- };
- const uint64_t DefaultPreferredMultipartChunkSize = 32u * 1024u * 1024u;
-
const double DefaultLatency = 0; // .0010;
const double DefaultDelayPerKBSec = 0; // 0.00005;
@@ -296,42 +278,6 @@ namespace {
}
WorkerThreadPool& GetNetworkPool() { return SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst); }
- static const std::vector<uint32_t> NonCompressableExtensions({HashStringDjb2(".mp4"sv),
- HashStringDjb2(".zip"sv),
- HashStringDjb2(".7z"sv),
- HashStringDjb2(".bzip"sv),
- HashStringDjb2(".rar"sv),
- HashStringDjb2(".gzip"sv),
- HashStringDjb2(".apk"sv),
- HashStringDjb2(".nsp"sv),
- HashStringDjb2(".xvc"sv),
- HashStringDjb2(".pkg"sv),
- HashStringDjb2(".dmg"sv),
- HashStringDjb2(".ipa"sv)});
-
- static const tsl::robin_set<uint32_t> NonCompressableExtensionSet(NonCompressableExtensions.begin(), NonCompressableExtensions.end());
-
- static bool IsExtensionHashCompressable(const uint32_t PathHash) { return !NonCompressableExtensionSet.contains(PathHash); }
-
- static bool IsChunkCompressable(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, uint32_t ChunkIndex)
- {
- ZEN_UNUSED(Content);
- const uint32_t ChunkLocationCount = Lookup.ChunkSequenceLocationCounts[ChunkIndex];
- if (ChunkLocationCount == 0)
- {
- return false;
- }
- const size_t ChunkLocationOffset = Lookup.ChunkSequenceLocationOffset[ChunkIndex];
- const uint32_t SequenceIndex = Lookup.ChunkSequenceLocations[ChunkLocationOffset].SequenceIndex;
- const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex];
- const uint32_t ExtensionHash = Lookup.PathExtensionHash[PathIndex];
-
- const bool IsCompressable = IsExtensionHashCompressable(ExtensionHash);
- return IsCompressable;
- }
-
- const uint64_t MinimumSizeForCompressInBlock = 2u * 1024u;
-
const std::string ZenFolderName = ".zen";
std::filesystem::path ZenStateFilePath(const std::filesystem::path& ZenFolderPath) { return ZenFolderPath / "current_state.cbo"; }
// std::filesystem::path ZenStateFileJsonPath(const std::filesystem::path& ZenFolderPath) { return ZenFolderPath / "current_state.json";
@@ -575,17 +521,6 @@ namespace {
}
}
- template<typename T>
- std::string FormatArray(std::span<const T> Items, std::string_view Prefix)
- {
- ExtendableStringBuilder<512> SB;
- for (const T& Item : Items)
- {
- SB.Append(fmt::format("{}{}", Prefix, Item));
- }
- return SB.ToString();
- }
-
bool CleanDirectory(const std::filesystem::path& Path, std::span<const std::string> ExcludeDirectories)
{
ZEN_TRACE_CPU("CleanDirectory");
@@ -785,15 +720,6 @@ namespace {
return CleanWipe;
}
- IoBuffer WriteToTempFile(CompositeBuffer&& Buffer,
- const std::filesystem::path& TempFolderPath,
- const IoHash& Hash,
- const std::string& Suffix = {})
- {
- std::filesystem::path TempFilePath = TempFolderPath / (Hash.ToHexString() + Suffix);
- return WriteToTempFile(std::move(Buffer), TempFilePath);
- }
-
class FilteredRate
{
public:
@@ -886,161 +812,6 @@ namespace {
return Count * 1000000 / ElapsedWallTimeUS;
}
- struct FindBlocksStatistics
- {
- uint64_t FindBlockTimeMS = 0;
- uint64_t PotentialChunkCount = 0;
- uint64_t PotentialChunkByteCount = 0;
- uint64_t FoundBlockCount = 0;
- uint64_t FoundBlockChunkCount = 0;
- uint64_t FoundBlockByteCount = 0;
- uint64_t AcceptedBlockCount = 0;
- uint64_t AcceptedChunkCount = 0;
- uint64_t AcceptedByteCount = 0;
- uint64_t AcceptedRawByteCount = 0;
- uint64_t RejectedBlockCount = 0;
- uint64_t RejectedChunkCount = 0;
- uint64_t RejectedByteCount = 0;
- uint64_t AcceptedReduntantChunkCount = 0;
- uint64_t AcceptedReduntantByteCount = 0;
- uint64_t NewBlocksCount = 0;
- uint64_t NewBlocksChunkCount = 0;
- uint64_t NewBlocksChunkByteCount = 0;
- };
-
- struct UploadStatistics
- {
- std::atomic<uint64_t> BlockCount = 0;
- std::atomic<uint64_t> BlocksBytes = 0;
- std::atomic<uint64_t> ChunkCount = 0;
- std::atomic<uint64_t> ChunksBytes = 0;
- std::atomic<uint64_t> ReadFromDiskBytes = 0;
- std::atomic<uint64_t> MultipartAttachmentCount = 0;
- uint64_t ElapsedWallTimeUS = 0;
-
- UploadStatistics& operator+=(const UploadStatistics& Rhs)
- {
- BlockCount += Rhs.BlockCount;
- BlocksBytes += Rhs.BlocksBytes;
- ChunkCount += Rhs.ChunkCount;
- ChunksBytes += Rhs.ChunksBytes;
- ReadFromDiskBytes += Rhs.ReadFromDiskBytes;
- MultipartAttachmentCount += Rhs.MultipartAttachmentCount;
- ElapsedWallTimeUS += Rhs.ElapsedWallTimeUS;
- return *this;
- }
- };
-
- struct LooseChunksStatistics
- {
- uint64_t ChunkCount = 0;
- uint64_t ChunkByteCount = 0;
- std::atomic<uint64_t> CompressedChunkCount = 0;
- std::atomic<uint64_t> CompressedChunkRawBytes = 0;
- std::atomic<uint64_t> CompressedChunkBytes = 0;
- uint64_t CompressChunksElapsedWallTimeUS = 0;
-
- LooseChunksStatistics& operator+=(const LooseChunksStatistics& Rhs)
- {
- ChunkCount += Rhs.ChunkCount;
- ChunkByteCount += Rhs.ChunkByteCount;
- CompressedChunkCount += Rhs.CompressedChunkCount;
- CompressedChunkRawBytes += Rhs.CompressedChunkRawBytes;
- CompressedChunkBytes += Rhs.CompressedChunkBytes;
- CompressChunksElapsedWallTimeUS += Rhs.CompressChunksElapsedWallTimeUS;
- return *this;
- }
- };
-
- struct GenerateBlocksStatistics
- {
- std::atomic<uint64_t> GeneratedBlockByteCount = 0;
- std::atomic<uint64_t> GeneratedBlockCount = 0;
- uint64_t GenerateBlocksElapsedWallTimeUS = 0;
-
- GenerateBlocksStatistics& operator+=(const GenerateBlocksStatistics& Rhs)
- {
- GeneratedBlockByteCount += Rhs.GeneratedBlockByteCount;
- GeneratedBlockCount += Rhs.GeneratedBlockCount;
- GenerateBlocksElapsedWallTimeUS += Rhs.GenerateBlocksElapsedWallTimeUS;
- return *this;
- }
- };
-
- struct VerifyFolderStatistics
- {
- std::atomic<uint64_t> FilesVerified = 0;
- std::atomic<uint64_t> FilesFailed = 0;
- std::atomic<uint64_t> ReadBytes = 0;
- uint64_t VerifyElapsedWallTimeUs = 0;
- };
-
- std::vector<uint32_t> CalculateAbsoluteChunkOrders(const std::span<const IoHash> LocalChunkHashes,
- const std::span<const uint32_t> LocalChunkOrder,
- const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& ChunkHashToLocalChunkIndex,
- const std::span<const uint32_t>& LooseChunkIndexes,
- const std::span<const ChunkBlockDescription>& BlockDescriptions)
- {
- ZEN_TRACE_CPU("CalculateAbsoluteChunkOrders");
-
-#if EXTRA_VERIFY
- std::vector<IoHash> TmpAbsoluteChunkHashes;
- TmpAbsoluteChunkHashes.reserve(LocalChunkHashes.size());
-#endif // EXTRA_VERIFY
- std::vector<uint32_t> LocalChunkIndexToAbsoluteChunkIndex;
- LocalChunkIndexToAbsoluteChunkIndex.resize(LocalChunkHashes.size(), (uint32_t)-1);
- std::uint32_t AbsoluteChunkCount = 0;
- for (uint32_t ChunkIndex : LooseChunkIndexes)
- {
- LocalChunkIndexToAbsoluteChunkIndex[ChunkIndex] = AbsoluteChunkCount;
-#if EXTRA_VERIFY
- TmpAbsoluteChunkHashes.push_back(LocalChunkHashes[ChunkIndex]);
-#endif // EXTRA_VERIFY
- AbsoluteChunkCount++;
- }
- for (const ChunkBlockDescription& Block : BlockDescriptions)
- {
- for (const IoHash& ChunkHash : Block.ChunkRawHashes)
- {
- if (auto It = ChunkHashToLocalChunkIndex.find(ChunkHash); It != ChunkHashToLocalChunkIndex.end())
- {
- const uint32_t LocalChunkIndex = It->second;
- ZEN_ASSERT_SLOW(LocalChunkHashes[LocalChunkIndex] == ChunkHash);
- LocalChunkIndexToAbsoluteChunkIndex[LocalChunkIndex] = AbsoluteChunkCount;
- }
-#if EXTRA_VERIFY
- TmpAbsoluteChunkHashes.push_back(ChunkHash);
-#endif // EXTRA_VERIFY
- AbsoluteChunkCount++;
- }
- }
- std::vector<uint32_t> AbsoluteChunkOrder;
- AbsoluteChunkOrder.reserve(LocalChunkHashes.size());
- for (const uint32_t LocalChunkIndex : LocalChunkOrder)
- {
- const uint32_t AbsoluteChunkIndex = LocalChunkIndexToAbsoluteChunkIndex[LocalChunkIndex];
-#if EXTRA_VERIFY
- ZEN_ASSERT(LocalChunkHashes[LocalChunkIndex] == TmpAbsoluteChunkHashes[AbsoluteChunkIndex]);
-#endif // EXTRA_VERIFY
- AbsoluteChunkOrder.push_back(AbsoluteChunkIndex);
- }
-#if EXTRA_VERIFY
- {
- uint32_t OrderIndex = 0;
- while (OrderIndex < LocalChunkOrder.size())
- {
- const uint32_t LocalChunkIndex = LocalChunkOrder[OrderIndex];
- const IoHash& LocalChunkHash = LocalChunkHashes[LocalChunkIndex];
- const uint32_t AbsoluteChunkIndex = AbsoluteChunkOrder[OrderIndex];
- const IoHash& AbsoluteChunkHash = TmpAbsoluteChunkHashes[AbsoluteChunkIndex];
- ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash);
- OrderIndex++;
- }
- }
-#endif // EXTRA_VERIFY
- return AbsoluteChunkOrder;
- }
-
void CalculateLocalChunkOrders(const std::span<const uint32_t>& AbsoluteChunkOrders,
const std::span<const IoHash> LooseChunkHashes,
const std::span<const uint64_t> LooseChunkRawSizes,
@@ -1111,86 +882,6 @@ namespace {
#endif // EXTRA_VERIFY
}
- void WriteBuildContentToCompactBinary(CbObjectWriter& PartManifestWriter,
- const SourcePlatform Platform,
- std::span<const std::filesystem::path> Paths,
- std::span<const IoHash> RawHashes,
- std::span<const uint64_t> RawSizes,
- std::span<const uint32_t> Attributes,
- std::span<const IoHash> SequenceRawHashes,
- std::span<const uint32_t> ChunkCounts,
- std::span<const IoHash> LocalChunkHashes,
- std::span<const uint64_t> LocalChunkRawSizes,
- const std::vector<uint32_t>& AbsoluteChunkOrders,
- const std::span<const uint32_t> LooseLocalChunkIndexes,
- const std::span<IoHash> BlockHashes)
- {
- ZEN_ASSERT(Platform != SourcePlatform::_Count);
- PartManifestWriter.AddString("platform"sv, ToString(Platform));
-
- uint64_t TotalSize = 0;
- for (const uint64_t Size : RawSizes)
- {
- TotalSize += Size;
- }
- PartManifestWriter.AddInteger("totalSize", TotalSize);
-
- PartManifestWriter.BeginObject("files"sv);
- {
- compactbinary_helpers::WriteArray(Paths, "paths"sv, PartManifestWriter);
- compactbinary_helpers::WriteArray(RawHashes, "rawhashes"sv, PartManifestWriter);
- compactbinary_helpers::WriteArray(RawSizes, "rawsizes"sv, PartManifestWriter);
- if (Platform == SourcePlatform::Windows)
- {
- compactbinary_helpers::WriteArray(Attributes, "attributes"sv, PartManifestWriter);
- }
- if (Platform == SourcePlatform::Linux || Platform == SourcePlatform::MacOS)
- {
- compactbinary_helpers::WriteArray(Attributes, "mode"sv, PartManifestWriter);
- }
- }
- PartManifestWriter.EndObject(); // files
-
- PartManifestWriter.BeginObject("chunkedContent");
- {
- compactbinary_helpers::WriteArray(SequenceRawHashes, "sequenceRawHashes"sv, PartManifestWriter);
- compactbinary_helpers::WriteArray(ChunkCounts, "chunkcounts"sv, PartManifestWriter);
- compactbinary_helpers::WriteArray(AbsoluteChunkOrders, "chunkorders"sv, PartManifestWriter);
- }
- PartManifestWriter.EndObject(); // chunkedContent
-
- size_t LooseChunkCount = LooseLocalChunkIndexes.size();
- if (LooseChunkCount > 0)
- {
- PartManifestWriter.BeginObject("chunkAttachments");
- {
- PartManifestWriter.BeginArray("rawHashes"sv);
- for (uint32_t ChunkIndex : LooseLocalChunkIndexes)
- {
- PartManifestWriter.AddBinaryAttachment(LocalChunkHashes[ChunkIndex]);
- }
- PartManifestWriter.EndArray(); // rawHashes
-
- PartManifestWriter.BeginArray("chunkRawSizes"sv);
- for (uint32_t ChunkIndex : LooseLocalChunkIndexes)
- {
- PartManifestWriter.AddInteger(LocalChunkRawSizes[ChunkIndex]);
- }
- PartManifestWriter.EndArray(); // chunkSizes
- }
- PartManifestWriter.EndObject(); //
- }
-
- if (BlockHashes.size() > 0)
- {
- PartManifestWriter.BeginObject("blockAttachments");
- {
- compactbinary_helpers::WriteBinaryAttachmentArray(BlockHashes, "rawHashes"sv, PartManifestWriter);
- }
- PartManifestWriter.EndObject(); // blocks
- }
- }
-
void ReadBuildContentFromCompactBinary(CbObjectView BuildPartManifest,
SourcePlatform& OutPlatform,
std::vector<std::filesystem::path>& OutPaths,
@@ -1468,72 +1159,6 @@ namespace {
TemporaryFile::SafeWriteFile(WritePath, JsonPayload);
}
- class ReadFileCache
- {
- public:
- // A buffered file reader that provides CompositeBuffer where the buffers are owned and the memory never overwritten
- ReadFileCache(DiskStatistics& DiskStats,
- const std::filesystem::path& Path,
- const ChunkedFolderContent& LocalContent,
- const ChunkedContentLookup& LocalLookup,
- size_t MaxOpenFileCount)
- : m_Path(Path)
- , m_LocalContent(LocalContent)
- , m_LocalLookup(LocalLookup)
- , m_DiskStats(DiskStats)
- {
- m_OpenFiles.reserve(MaxOpenFileCount);
- }
- ~ReadFileCache() { m_OpenFiles.clear(); }
-
- CompositeBuffer GetRange(uint32_t SequenceIndex, uint64_t Offset, uint64_t Size)
- {
- ZEN_TRACE_CPU("ReadFileCache::GetRange");
-
- auto CacheIt = std::find_if(m_OpenFiles.begin(), m_OpenFiles.end(), [SequenceIndex](const auto& Lhs) {
- return Lhs.first == SequenceIndex;
- });
- if (CacheIt != m_OpenFiles.end())
- {
- if (CacheIt != m_OpenFiles.begin())
- {
- auto CachedFile(std::move(CacheIt->second));
- m_OpenFiles.erase(CacheIt);
- m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(SequenceIndex, std::move(CachedFile)));
- }
- CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size);
- return Result;
- }
- const uint32_t LocalPathIndex = m_LocalLookup.SequenceIndexFirstPathIndex[SequenceIndex];
- const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred();
- if (Size == m_LocalContent.RawSizes[LocalPathIndex])
- {
- IoBuffer Result = IoBufferBuilder::MakeFromFile(LocalFilePath);
- return CompositeBuffer(SharedBuffer(Result));
- }
- if (m_OpenFiles.size() == m_OpenFiles.capacity())
- {
- m_OpenFiles.pop_back();
- }
- m_OpenFiles.insert(m_OpenFiles.begin(),
- std::make_pair(SequenceIndex,
- std::make_unique<BufferedOpenFile>(LocalFilePath,
- m_DiskStats.OpenReadCount,
- m_DiskStats.CurrentOpenFileCount,
- m_DiskStats.ReadCount,
- m_DiskStats.ReadByteCount)));
- CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size);
- return Result;
- }
-
- private:
- const std::filesystem::path m_Path;
- const ChunkedFolderContent& m_LocalContent;
- const ChunkedContentLookup& m_LocalLookup;
- std::vector<std::pair<uint32_t, std::unique_ptr<BufferedOpenFile>>> m_OpenFiles;
- DiskStatistics& m_DiskStats;
- };
-
CompositeBuffer ValidateBlob(IoBuffer&& Payload, const IoHash& BlobHash, uint64_t& OutCompressedSize, uint64_t& OutDecompressedSize)
{
ZEN_TRACE_CPU("ValidateBlob");
@@ -1643,95 +1268,6 @@ namespace {
return GetChunkBlockDescription(BlockBuffer.Flatten(), BlobHash);
}
- CompositeBuffer FetchChunk(const ChunkedFolderContent& Content,
- const ChunkedContentLookup& Lookup,
- const IoHash& ChunkHash,
- ReadFileCache& OpenFileCache)
- {
- ZEN_TRACE_CPU("FetchChunk");
- auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash);
- ZEN_ASSERT(It != Lookup.ChunkHashToChunkIndex.end());
- uint32_t ChunkIndex = It->second;
- std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex);
- ZEN_ASSERT(!ChunkLocations.empty());
- CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex,
- ChunkLocations[0].Offset,
- Content.ChunkedContent.ChunkRawSizes[ChunkIndex]);
- ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash);
- return Chunk;
- };
-
- CompressedBuffer GenerateBlock(const std::filesystem::path& Path,
- const ChunkedFolderContent& Content,
- const ChunkedContentLookup& Lookup,
- const std::vector<uint32_t>& ChunksInBlock,
- ChunkBlockDescription& OutBlockDescription,
- DiskStatistics& DiskStats)
- {
- ZEN_TRACE_CPU("GenerateBlock");
- ReadFileCache OpenFileCache(DiskStats, Path, Content, Lookup, 4);
-
- std::vector<std::pair<IoHash, FetchChunkFunc>> BlockContent;
- BlockContent.reserve(ChunksInBlock.size());
- for (uint32_t ChunkIndex : ChunksInBlock)
- {
- BlockContent.emplace_back(std::make_pair(
- Content.ChunkedContent.ChunkHashes[ChunkIndex],
- [&Content, &Lookup, &OpenFileCache, ChunkIndex](const IoHash& ChunkHash) -> std::pair<uint64_t, CompressedBuffer> {
- CompositeBuffer Chunk = FetchChunk(Content, Lookup, ChunkHash, OpenFileCache);
- if (!Chunk)
- {
- ZEN_ASSERT(false);
- }
- uint64_t RawSize = Chunk.GetSize();
- const bool ShouldCompressChunk = Lookup.RawHashToSequenceIndex.contains(ChunkHash) &&
- (RawSize >= MinimumSizeForCompressInBlock) &&
- IsChunkCompressable(Content, Lookup, ChunkIndex);
- const OodleCompressionLevel CompressionLevel =
- ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None;
- return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, CompressionLevel)};
- }));
- }
-
- return GenerateChunkBlock(std::move(BlockContent), OutBlockDescription);
- };
-
- CompressedBuffer RebuildBlock(const std::filesystem::path& Path,
- const ChunkedFolderContent& Content,
- const ChunkedContentLookup& Lookup,
- CompositeBuffer&& HeaderBuffer,
- const std::vector<uint32_t>& ChunksInBlock,
- DiskStatistics& DiskStats)
- {
- ZEN_TRACE_CPU("RebuildBlock");
- ReadFileCache OpenFileCache(DiskStats, Path, Content, Lookup, 4);
-
- std::vector<SharedBuffer> ResultBuffers;
- ResultBuffers.reserve(HeaderBuffer.GetSegments().size() + ChunksInBlock.size());
- ResultBuffers.insert(ResultBuffers.end(), HeaderBuffer.GetSegments().begin(), HeaderBuffer.GetSegments().end());
- for (uint32_t ChunkIndex : ChunksInBlock)
- {
- std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex);
- ZEN_ASSERT(!ChunkLocations.empty());
- const IoHash& ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex];
- CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex,
- ChunkLocations[0].Offset,
- Content.ChunkedContent.ChunkRawSizes[ChunkIndex]);
- ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash);
-
- const uint64_t RawSize = Chunk.GetSize();
- const bool ShouldCompressChunk = Lookup.RawHashToSequenceIndex.contains(ChunkHash) &&
- (RawSize >= MinimumSizeForCompressInBlock) && IsChunkCompressable(Content, Lookup, ChunkIndex);
- const OodleCompressionLevel CompressionLevel =
- ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None;
-
- CompositeBuffer CompressedChunk =
- CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, CompressionLevel).GetCompressed();
- ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end());
- }
- return CompressedBuffer::FromCompressedNoValidate(CompositeBuffer(std::move(ResultBuffers)));
- };
-
struct ValidateStatistics
{
uint64_t BuildBlobSize = 0;
@@ -1790,7 +1326,7 @@ namespace {
}
}
ValidateStats.BuildBlobSize = Build.GetSize();
- uint64_t PreferredMultipartChunkSize = DefaultPreferredMultipartChunkSize;
+ uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u;
if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0)
{
PreferredMultipartChunkSize = ChunkSize;
@@ -2025,1079 +1561,6 @@ namespace {
ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::Cleanup, TaskSteps::StepCount);
}
- void ArrangeChunksIntoBlocks(const ChunkedFolderContent& Content,
- const ChunkedContentLookup& Lookup,
- uint64_t MaxBlockSize,
- uint64_t MaxChunksPerBlock,
- std::vector<uint32_t>& ChunkIndexes,
- std::vector<std::vector<uint32_t>>& OutBlocks)
- {
- ZEN_TRACE_CPU("ArrangeChunksIntoBlocks");
- std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&Content, &Lookup](uint32_t Lhs, uint32_t Rhs) {
- const ChunkedContentLookup::ChunkSequenceLocation& LhsLocation = GetChunkSequenceLocations(Lookup, Lhs)[0];
- const ChunkedContentLookup::ChunkSequenceLocation& RhsLocation = GetChunkSequenceLocations(Lookup, Rhs)[0];
- if (LhsLocation.SequenceIndex < RhsLocation.SequenceIndex)
- {
- return true;
- }
- else if (LhsLocation.SequenceIndex > RhsLocation.SequenceIndex)
- {
- return false;
- }
- return LhsLocation.Offset < RhsLocation.Offset;
- });
-
- uint64_t MaxBlockSizeLowThreshold = MaxBlockSize - (MaxBlockSize / 16);
-
- uint64_t BlockSize = 0;
-
- uint32_t ChunkIndexStart = 0;
- for (uint32_t ChunkIndexOffset = 0; ChunkIndexOffset < ChunkIndexes.size();)
- {
- const uint32_t ChunkIndex = ChunkIndexes[ChunkIndexOffset];
- const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
-
- if (((BlockSize + ChunkSize) > MaxBlockSize) || (ChunkIndexOffset - ChunkIndexStart) > MaxChunksPerBlock)
- {
- // Within the span of MaxBlockSizeLowThreshold and MaxBlockSize, see if there is a break
- // between source paths for chunks. Break the block at the last such break if any.
- ZEN_ASSERT(ChunkIndexOffset > ChunkIndexStart);
-
- const uint32_t ChunkSequenceIndex =
- Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[ChunkIndex]].SequenceIndex;
-
- uint64_t ScanBlockSize = BlockSize;
-
- uint32_t ScanChunkIndexOffset = ChunkIndexOffset - 1;
- while (ScanChunkIndexOffset > (ChunkIndexStart + 2))
- {
- const uint32_t TestChunkIndex = ChunkIndexes[ScanChunkIndexOffset];
- const uint64_t TestChunkSize = Content.ChunkedContent.ChunkRawSizes[TestChunkIndex];
- if ((ScanBlockSize - TestChunkSize) < MaxBlockSizeLowThreshold)
- {
- break;
- }
-
- const uint32_t TestSequenceIndex =
- Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[TestChunkIndex]].SequenceIndex;
- if (ChunkSequenceIndex != TestSequenceIndex)
- {
- ChunkIndexOffset = ScanChunkIndexOffset + 1;
- break;
- }
-
- ScanBlockSize -= TestChunkSize;
- ScanChunkIndexOffset--;
- }
-
- std::vector<uint32_t> ChunksInBlock;
- ChunksInBlock.reserve(ChunkIndexOffset - ChunkIndexStart);
- for (uint32_t AddIndexOffset = ChunkIndexStart; AddIndexOffset < ChunkIndexOffset; AddIndexOffset++)
- {
- const uint32_t AddChunkIndex = ChunkIndexes[AddIndexOffset];
- ChunksInBlock.push_back(AddChunkIndex);
- }
- OutBlocks.emplace_back(std::move(ChunksInBlock));
- BlockSize = 0;
- ChunkIndexStart = ChunkIndexOffset;
- }
- else
- {
- ChunkIndexOffset++;
- BlockSize += ChunkSize;
- }
- }
- if (ChunkIndexStart < ChunkIndexes.size())
- {
- std::vector<uint32_t> ChunksInBlock;
- ChunksInBlock.reserve(ChunkIndexes.size() - ChunkIndexStart);
- for (uint32_t AddIndexOffset = ChunkIndexStart; AddIndexOffset < ChunkIndexes.size(); AddIndexOffset++)
- {
- const uint32_t AddChunkIndex = ChunkIndexes[AddIndexOffset];
- ChunksInBlock.push_back(AddChunkIndex);
- }
- OutBlocks.emplace_back(std::move(ChunksInBlock));
- }
- }
-
- CompositeBuffer CompressChunk(const std::filesystem::path& Path,
- const ChunkedFolderContent& Content,
- const ChunkedContentLookup& Lookup,
- uint32_t ChunkIndex,
- const std::filesystem::path& TempFolderPath,
- LooseChunksStatistics& LooseChunksStats)
- {
- ZEN_TRACE_CPU("CompressChunk");
- ZEN_ASSERT(!TempFolderPath.empty());
- const IoHash& ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex];
- const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
-
- const ChunkedContentLookup::ChunkSequenceLocation& Source = GetChunkSequenceLocations(Lookup, ChunkIndex)[0];
- const std::uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[Source.SequenceIndex];
- IoBuffer RawSource = IoBufferBuilder::MakeFromFile((Path / Content.Paths[PathIndex]).make_preferred(), Source.Offset, ChunkSize);
- if (!RawSource)
- {
- throw std::runtime_error(fmt::format("Failed fetching chunk {}", ChunkHash));
- }
- if (RawSource.GetSize() != ChunkSize)
- {
- throw std::runtime_error(fmt::format("Fetched chunk {} has invalid size", ChunkHash));
- }
-
- const bool ShouldCompressChunk = IsChunkCompressable(Content, Lookup, ChunkIndex);
- const OodleCompressionLevel CompressionLevel = ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None;
-
- if (ShouldCompressChunk)
- {
- std::filesystem::path TempFilePath = TempFolderPath / ChunkHash.ToHexString();
-
- BasicFile CompressedFile;
- std::error_code Ec;
- CompressedFile.Open(TempFilePath, BasicFile::Mode::kTruncateDelete, Ec);
- if (Ec)
- {
- throw std::runtime_error(
- fmt::format("Failed creating temporary file for compressing blob {}. Reason: {}", ChunkHash, Ec.message()));
- }
-
- uint64_t StreamRawBytes = 0;
- uint64_t StreamCompressedBytes = 0;
-
- bool CouldCompress = CompressedBuffer::CompressToStream(
- CompositeBuffer(SharedBuffer(RawSource)),
- [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
- ZEN_UNUSED(SourceOffset);
- LooseChunksStats.CompressedChunkRawBytes += SourceSize;
- CompressedFile.Write(RangeBuffer, Offset);
- LooseChunksStats.CompressedChunkBytes += RangeBuffer.GetSize();
- StreamRawBytes += SourceSize;
- StreamCompressedBytes += RangeBuffer.GetSize();
- },
- OodleCompressor::Mermaid,
- CompressionLevel);
- if (CouldCompress)
- {
- uint64_t CompressedSize = CompressedFile.FileSize();
- void* FileHandle = CompressedFile.Detach();
- IoBuffer TempPayload = IoBuffer(IoBuffer::File,
- FileHandle,
- 0,
- CompressedSize,
- /*IsWholeFile*/ true);
- ZEN_ASSERT(TempPayload);
- TempPayload.SetDeleteOnClose(true);
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(TempPayload), RawHash, RawSize);
- ZEN_ASSERT(Compressed);
- ZEN_ASSERT(RawHash == ChunkHash);
- ZEN_ASSERT(RawSize == ChunkSize);
-
- LooseChunksStats.CompressedChunkCount++;
-
- return Compressed.GetCompressed();
- }
- else
- {
- LooseChunksStats.CompressedChunkRawBytes -= StreamRawBytes;
- LooseChunksStats.CompressedChunkBytes -= StreamCompressedBytes;
- }
- CompressedFile.Close();
- RemoveFile(TempFilePath, Ec);
- ZEN_UNUSED(Ec);
- }
-
- CompressedBuffer CompressedBlob =
- CompressedBuffer::Compress(SharedBuffer(std::move(RawSource)), OodleCompressor::Mermaid, CompressionLevel);
- if (!CompressedBlob)
- {
- throw std::runtime_error(fmt::format("Failed to compress large blob {}", ChunkHash));
- }
- ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawHash() == ChunkHash);
- ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawSize() == ChunkSize);
-
- LooseChunksStats.CompressedChunkRawBytes += ChunkSize;
- LooseChunksStats.CompressedChunkBytes += CompressedBlob.GetCompressedSize();
-
- // If we use none-compression, the compressed blob references the data and has 64 kb in memory so we don't need to write it to disk
- if (ShouldCompressChunk)
- {
- IoBuffer TempPayload = WriteToTempFile(std::move(CompressedBlob).GetCompressed(), TempFolderPath, ChunkHash);
- CompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload));
- }
-
- LooseChunksStats.CompressedChunkCount++;
- return std::move(CompressedBlob).GetCompressed();
- }
-
- struct GeneratedBlocks
- {
- std::vector<ChunkBlockDescription> BlockDescriptions;
- std::vector<uint64_t> BlockSizes;
- std::vector<CompositeBuffer> BlockHeaders;
- std::vector<CbObject> BlockMetaDatas;
- std::vector<uint8_t>
- MetaDataHasBeenUploaded; // NOTE: Do not use std::vector<bool> here as this vector is modified by multiple threads
- tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockHashToBlockIndex;
- };
-
- void GenerateBuildBlocks(const std::filesystem::path& Path,
- const ChunkedFolderContent& Content,
- const ChunkedContentLookup& Lookup,
- StorageInstance& Storage,
- const Oid& BuildId,
- const std::vector<std::vector<uint32_t>>& NewBlockChunks,
- GeneratedBlocks& OutBlocks,
- DiskStatistics& DiskStats,
- UploadStatistics& UploadStats,
- GenerateBlocksStatistics& GenerateBlocksStats)
- {
- ZEN_TRACE_CPU("GenerateBuildBlocks");
- const std::size_t NewBlockCount = NewBlockChunks.size();
- if (NewBlockCount > 0)
- {
- ProgressBar ProgressBar(ProgressMode, "Generate Blocks");
-
- OutBlocks.BlockDescriptions.resize(NewBlockCount);
- OutBlocks.BlockSizes.resize(NewBlockCount);
- OutBlocks.BlockMetaDatas.resize(NewBlockCount);
- OutBlocks.BlockHeaders.resize(NewBlockCount);
- OutBlocks.MetaDataHasBeenUploaded.resize(NewBlockCount, 0);
- OutBlocks.BlockHashToBlockIndex.reserve(NewBlockCount);
-
- RwLock Lock;
-
- WorkerThreadPool& GenerateBlobsPool = GetIOWorkerPool();
- WorkerThreadPool& UploadBlocksPool = GetNetworkPool();
-
- FilteredRate FilteredGeneratedBytesPerSecond;
- FilteredRate FilteredUploadedBytesPerSecond;
-
- ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
-
- std::atomic<uint64_t> QueuedPendingBlocksForUpload = 0;
-
- for (size_t BlockIndex = 0; BlockIndex < NewBlockCount; BlockIndex++)
- {
- if (Work.IsAborted())
- {
- break;
- }
- const std::vector<uint32_t>& ChunksInBlock = NewBlockChunks[BlockIndex];
- Work.ScheduleWork(
- GenerateBlobsPool,
- [&Storage,
- BuildId,
- &Path,
- &Content,
- &Lookup,
- &Work,
- &UploadBlocksPool,
- NewBlockCount,
- ChunksInBlock,
- &Lock,
- &OutBlocks,
- &DiskStats,
- &GenerateBlocksStats,
- &UploadStats,
- &FilteredGeneratedBytesPerSecond,
- &QueuedPendingBlocksForUpload,
- &FilteredUploadedBytesPerSecond,
- BlockIndex](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("GenerateBuildBlocks_Generate");
-
- FilteredGeneratedBytesPerSecond.Start();
- // TODO: Convert ScheduleWork body to function
-
- Stopwatch GenerateTimer;
- CompressedBuffer CompressedBlock =
- GenerateBlock(Path, Content, Lookup, ChunksInBlock, OutBlocks.BlockDescriptions[BlockIndex], DiskStats);
- ZEN_CONSOLE_VERBOSE("Generated block {} ({}) containing {} chunks in {}",
- OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
- NiceBytes(CompressedBlock.GetCompressedSize()),
- OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(),
- NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs()));
-
- OutBlocks.BlockSizes[BlockIndex] = CompressedBlock.GetCompressedSize();
- {
- CbObjectWriter Writer;
- Writer.AddString("createdBy", "zen");
- OutBlocks.BlockMetaDatas[BlockIndex] = Writer.Save();
- }
- GenerateBlocksStats.GeneratedBlockByteCount += OutBlocks.BlockSizes[BlockIndex];
- GenerateBlocksStats.GeneratedBlockCount++;
-
- Lock.WithExclusiveLock([&]() {
- OutBlocks.BlockHashToBlockIndex.insert_or_assign(OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
- BlockIndex);
- });
-
- {
- std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments();
- ZEN_ASSERT(Segments.size() >= 2);
- OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
- }
-
- if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount)
- {
- FilteredGeneratedBytesPerSecond.Stop();
- }
-
- if (QueuedPendingBlocksForUpload.load() > 16)
- {
- std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments();
- ZEN_ASSERT(Segments.size() >= 2);
- OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
- }
- else
- {
- if (!AbortFlag)
- {
- QueuedPendingBlocksForUpload++;
-
- Work.ScheduleWork(
- UploadBlocksPool,
- [&Storage,
- BuildId,
- NewBlockCount,
- &FilteredUploadedBytesPerSecond,
- &QueuedPendingBlocksForUpload,
- &GenerateBlocksStats,
- &UploadStats,
- &OutBlocks,
- BlockIndex,
- Payload = std::move(CompressedBlock)](std::atomic<bool>&) mutable {
- auto _ = MakeGuard([&QueuedPendingBlocksForUpload] { QueuedPendingBlocksForUpload--; });
- if (!AbortFlag)
- {
- if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount)
- {
- ZEN_TRACE_CPU("GenerateBuildBlocks_Save");
-
- FilteredUploadedBytesPerSecond.Stop();
- std::span<const SharedBuffer> Segments = Payload.GetCompressed().GetSegments();
- ZEN_ASSERT(Segments.size() >= 2);
- OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
- }
- else
- {
- ZEN_TRACE_CPU("GenerateBuildBlocks_Upload");
-
- FilteredUploadedBytesPerSecond.Start();
- // TODO: Convert ScheduleWork body to function
-
- const CbObject BlockMetaData =
- BuildChunkBlockDescription(OutBlocks.BlockDescriptions[BlockIndex],
- OutBlocks.BlockMetaDatas[BlockIndex]);
-
- const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash;
- const uint64_t CompressedBlockSize = Payload.GetCompressedSize();
-
- if (Storage.BuildCacheStorage)
- {
- Storage.BuildCacheStorage->PutBuildBlob(BuildId,
- BlockHash,
- ZenContentType::kCompressedBinary,
- Payload.GetCompressed());
- }
-
- Storage.BuildStorage->PutBuildBlob(BuildId,
- BlockHash,
- ZenContentType::kCompressedBinary,
- std::move(Payload).GetCompressed());
- UploadStats.BlocksBytes += CompressedBlockSize;
-
- ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks",
- BlockHash,
- NiceBytes(CompressedBlockSize),
- OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
-
- if (Storage.BuildCacheStorage)
- {
- Storage.BuildCacheStorage->PutBlobMetadatas(BuildId,
- std::vector<IoHash>({BlockHash}),
- std::vector<CbObject>({BlockMetaData}));
- }
-
- bool MetadataSucceeded =
- Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData);
- if (MetadataSucceeded)
- {
- ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})",
- BlockHash,
- NiceBytes(BlockMetaData.GetSize()));
-
- OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
- UploadStats.BlocksBytes += BlockMetaData.GetSize();
- }
-
- UploadStats.BlockCount++;
- if (UploadStats.BlockCount == NewBlockCount)
- {
- FilteredUploadedBytesPerSecond.Stop();
- }
- }
- }
- });
- }
- }
- }
- });
- }
-
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(PendingWork);
-
- FilteredGeneratedBytesPerSecond.Update(GenerateBlocksStats.GeneratedBlockByteCount.load());
- FilteredUploadedBytesPerSecond.Update(UploadStats.BlocksBytes.load());
-
- std::string Details = fmt::format("Generated {}/{} ({}, {}B/s). Uploaded {}/{} ({}, {}bits/s)",
- GenerateBlocksStats.GeneratedBlockCount.load(),
- NewBlockCount,
- NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount.load()),
- NiceNum(FilteredGeneratedBytesPerSecond.GetCurrent()),
- UploadStats.BlockCount.load(),
- NewBlockCount,
- NiceBytes(UploadStats.BlocksBytes.load()),
- NiceNum(FilteredUploadedBytesPerSecond.GetCurrent() * 8));
-
- ProgressBar.UpdateState(
- {.Task = "Generating blocks",
- .Details = Details,
- .TotalCount = gsl::narrow<uint64_t>(NewBlockCount),
- .RemainingCount = gsl::narrow<uint64_t>(NewBlockCount - GenerateBlocksStats.GeneratedBlockCount.load()),
- .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
- false);
- });
-
- ZEN_ASSERT(AbortFlag || QueuedPendingBlocksForUpload.load() == 0);
-
- ProgressBar.Finish();
-
- GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGeneratedBytesPerSecond.GetElapsedTimeUS();
- UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS();
- }
- }
-
- void UploadPartBlobs(StorageInstance& Storage,
- const Oid& BuildId,
- const std::filesystem::path& Path,
- const std::filesystem::path& TempDir,
- const ChunkedFolderContent& Content,
- const ChunkedContentLookup& Lookup,
- std::span<IoHash> RawHashes,
- const std::vector<std::vector<uint32_t>>& NewBlockChunks,
- GeneratedBlocks& NewBlocks,
- std::span<const uint32_t> LooseChunkIndexes,
- const std::uint64_t LargeAttachmentSize,
- DiskStatistics& DiskStats,
- UploadStatistics& UploadStats,
- LooseChunksStatistics& LooseChunksStats)
- {
- ZEN_TRACE_CPU("UploadPartBlobs");
- {
- ProgressBar ProgressBar(ProgressMode, "Upload Blobs");
-
- WorkerThreadPool& ReadChunkPool = GetIOWorkerPool();
- WorkerThreadPool& UploadChunkPool = GetNetworkPool();
-
- FilteredRate FilteredGenerateBlockBytesPerSecond;
- FilteredRate FilteredCompressedBytesPerSecond;
- FilteredRate FilteredUploadedBytesPerSecond;
-
- ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
-
- std::atomic<size_t> UploadedBlockSize = 0;
- std::atomic<size_t> UploadedBlockCount = 0;
- std::atomic<size_t> UploadedRawChunkSize = 0;
- std::atomic<size_t> UploadedCompressedChunkSize = 0;
- std::atomic<uint32_t> UploadedChunkCount = 0;
-
- tsl::robin_map<uint32_t, uint32_t> ChunkIndexToLooseChunkOrderIndex;
- ChunkIndexToLooseChunkOrderIndex.reserve(LooseChunkIndexes.size());
- for (uint32_t OrderIndex = 0; OrderIndex < LooseChunkIndexes.size(); OrderIndex++)
- {
- ChunkIndexToLooseChunkOrderIndex.insert_or_assign(LooseChunkIndexes[OrderIndex], OrderIndex);
- }
-
- std::vector<size_t> BlockIndexes;
- std::vector<uint32_t> LooseChunkOrderIndexes;
-
- uint64_t TotalLooseChunksSize = 0;
- uint64_t TotalBlocksSize = 0;
- for (const IoHash& RawHash : RawHashes)
- {
- if (auto It = NewBlocks.BlockHashToBlockIndex.find(RawHash); It != NewBlocks.BlockHashToBlockIndex.end())
- {
- BlockIndexes.push_back(It->second);
- TotalBlocksSize += NewBlocks.BlockSizes[It->second];
- }
- else if (auto ChunkIndexIt = Lookup.ChunkHashToChunkIndex.find(RawHash); ChunkIndexIt != Lookup.ChunkHashToChunkIndex.end())
- {
- const uint32_t ChunkIndex = ChunkIndexIt->second;
- if (auto LooseOrderIndexIt = ChunkIndexToLooseChunkOrderIndex.find(ChunkIndex);
- LooseOrderIndexIt != ChunkIndexToLooseChunkOrderIndex.end())
- {
- LooseChunkOrderIndexes.push_back(LooseOrderIndexIt->second);
- TotalLooseChunksSize += Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
- }
- }
- else
- {
- ZEN_CONSOLE_WARN(
- "Build blob {} was reported as needed for upload but it was reported as existing at the start of the "
- "operation. Treating it as a transient inconsistent issue and will attempt to retry finalization",
- RawHash);
- }
- }
- uint64_t TotalRawSize = TotalLooseChunksSize + TotalBlocksSize;
-
- const size_t UploadBlockCount = BlockIndexes.size();
- const uint32_t UploadChunkCount = gsl::narrow<uint32_t>(LooseChunkOrderIndexes.size());
-
- auto AsyncUploadBlock = [&Storage,
- &BuildId,
- &Work,
- &TempDir,
- &NewBlocks,
- UploadBlockCount,
- &UploadedBlockCount,
- UploadChunkCount,
- &UploadedChunkCount,
- &UploadedBlockSize,
- &UploadStats,
- &FilteredUploadedBytesPerSecond,
- &UploadChunkPool](const size_t BlockIndex,
- const IoHash BlockHash,
- CompositeBuffer&& Payload,
- std::atomic<uint64_t>& QueuedPendingInMemoryBlocksForUpload) {
- bool IsInMemoryBlock = true;
- if (QueuedPendingInMemoryBlocksForUpload.load() > 16)
- {
- ZEN_TRACE_CPU("AsyncUploadBlock_WriteTempBlock");
- Payload = CompositeBuffer(WriteToTempFile(std::move(Payload), TempDir, BlockHash));
- IsInMemoryBlock = false;
- }
- else
- {
- QueuedPendingInMemoryBlocksForUpload++;
- }
-
- Work.ScheduleWork(
- UploadChunkPool,
- [&Storage,
- &BuildId,
- &QueuedPendingInMemoryBlocksForUpload,
- &NewBlocks,
- UploadBlockCount,
- &UploadedBlockCount,
- UploadChunkCount,
- &UploadedChunkCount,
- &UploadedBlockSize,
- &UploadStats,
- &FilteredUploadedBytesPerSecond,
- IsInMemoryBlock,
- BlockIndex,
- BlockHash,
- Payload = std::move(Payload)](std::atomic<bool>&) mutable {
- auto _ = MakeGuard([IsInMemoryBlock, &QueuedPendingInMemoryBlocksForUpload] {
- if (IsInMemoryBlock)
- {
- QueuedPendingInMemoryBlocksForUpload--;
- }
- });
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("AsyncUploadBlock");
-
- const uint64_t PayloadSize = Payload.GetSize();
-
- FilteredUploadedBytesPerSecond.Start();
- const CbObject BlockMetaData =
- BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]);
-
- if (Storage.BuildCacheStorage)
- {
- Storage.BuildCacheStorage->PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload);
- }
- Storage.BuildStorage->PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload);
- ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks",
- BlockHash,
- NiceBytes(PayloadSize),
- NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
- UploadedBlockSize += PayloadSize;
- UploadStats.BlocksBytes += PayloadSize;
-
- if (Storage.BuildCacheStorage)
- {
- Storage.BuildCacheStorage->PutBlobMetadatas(BuildId,
- std::vector<IoHash>({BlockHash}),
- std::vector<CbObject>({BlockMetaData}));
- }
- bool MetadataSucceeded = Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData);
- if (MetadataSucceeded)
- {
- ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", BlockHash, NiceBytes(BlockMetaData.GetSize()));
-
- NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
- UploadStats.BlocksBytes += BlockMetaData.GetSize();
- }
-
- UploadStats.BlockCount++;
-
- UploadedBlockCount++;
- if (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount)
- {
- FilteredUploadedBytesPerSecond.Stop();
- }
- }
- });
- };
-
- auto AsyncUploadLooseChunk = [&Storage,
- BuildId,
- LargeAttachmentSize,
- &Work,
- &UploadChunkPool,
- &FilteredUploadedBytesPerSecond,
- &UploadedBlockCount,
- &UploadedChunkCount,
- UploadBlockCount,
- UploadChunkCount,
- &UploadedCompressedChunkSize,
- &UploadedRawChunkSize,
- &UploadStats](const IoHash& RawHash, uint64_t RawSize, CompositeBuffer&& Payload) {
- Work.ScheduleWork(
- UploadChunkPool,
- [&Storage,
- BuildId,
- &Work,
- LargeAttachmentSize,
- &FilteredUploadedBytesPerSecond,
- &UploadChunkPool,
- &UploadedBlockCount,
- &UploadedChunkCount,
- UploadBlockCount,
- UploadChunkCount,
- &UploadedCompressedChunkSize,
- &UploadedRawChunkSize,
- &UploadStats,
- RawHash,
- RawSize,
- Payload = CompositeBuffer(std::move(Payload))](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("AsyncUploadLooseChunk");
-
- const uint64_t PayloadSize = Payload.GetSize();
-
- if (Storage.BuildCacheStorage)
- {
- Storage.BuildCacheStorage->PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
- }
-
- if (PayloadSize >= LargeAttachmentSize)
- {
- ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart");
- UploadStats.MultipartAttachmentCount++;
- std::vector<std::function<void()>> MultipartWork = Storage.BuildStorage->PutLargeBuildBlob(
- BuildId,
- RawHash,
- ZenContentType::kCompressedBinary,
- PayloadSize,
- [Payload = std::move(Payload), &FilteredUploadedBytesPerSecond](uint64_t Offset,
- uint64_t Size) mutable -> IoBuffer {
- FilteredUploadedBytesPerSecond.Start();
-
- IoBuffer PartPayload = Payload.Mid(Offset, Size).Flatten().AsIoBuffer();
- PartPayload.SetContentType(ZenContentType::kBinary);
- return PartPayload;
- },
- [RawSize,
- &UploadStats,
- &UploadedCompressedChunkSize,
- &UploadChunkPool,
- &UploadedBlockCount,
- UploadBlockCount,
- &UploadedChunkCount,
- UploadChunkCount,
- &FilteredUploadedBytesPerSecond,
- &UploadedRawChunkSize](uint64_t SentBytes, bool IsComplete) {
- UploadStats.ChunksBytes += SentBytes;
- UploadedCompressedChunkSize += SentBytes;
- if (IsComplete)
- {
- UploadStats.ChunkCount++;
- UploadedChunkCount++;
- if (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount)
- {
- FilteredUploadedBytesPerSecond.Stop();
- }
- UploadedRawChunkSize += RawSize;
- }
- });
- for (auto& WorkPart : MultipartWork)
- {
- Work.ScheduleWork(UploadChunkPool, [Work = std::move(WorkPart)](std::atomic<bool>&) {
- ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart_Work");
- if (!AbortFlag)
- {
- Work();
- }
- });
- }
- ZEN_CONSOLE_VERBOSE("Uploaded multipart chunk {} ({})", RawHash, NiceBytes(PayloadSize));
- }
- else
- {
- ZEN_TRACE_CPU("AsyncUploadLooseChunk_Singlepart");
- Storage.BuildStorage->PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
- ZEN_CONSOLE_VERBOSE("Uploaded chunk {} ({})", RawHash, NiceBytes(PayloadSize));
- UploadStats.ChunksBytes += Payload.GetSize();
- UploadStats.ChunkCount++;
- UploadedCompressedChunkSize += Payload.GetSize();
- UploadedRawChunkSize += RawSize;
- UploadedChunkCount++;
- if (UploadedChunkCount == UploadChunkCount)
- {
- FilteredUploadedBytesPerSecond.Stop();
- }
- }
- }
- });
- };
-
- std::vector<size_t> GenerateBlockIndexes;
-
- std::atomic<uint64_t> GeneratedBlockCount = 0;
- std::atomic<uint64_t> GeneratedBlockByteCount = 0;
-
- std::atomic<uint64_t> QueuedPendingInMemoryBlocksForUpload = 0;
-
- // Start generation of any non-prebuilt blocks and schedule upload
- for (const size_t BlockIndex : BlockIndexes)
- {
- const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash;
- if (!AbortFlag)
- {
- Work.ScheduleWork(
- ReadChunkPool,
- [BlockHash = IoHash(BlockHash),
- BlockIndex,
- &FilteredGenerateBlockBytesPerSecond,
- Path,
- &Content,
- &Lookup,
- &NewBlocks,
- &NewBlockChunks,
- &GenerateBlockIndexes,
- &GeneratedBlockCount,
- &GeneratedBlockByteCount,
- &DiskStats,
- &AsyncUploadBlock,
- &QueuedPendingInMemoryBlocksForUpload](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UploadPartBlobs_GenerateBlock");
-
- FilteredGenerateBlockBytesPerSecond.Start();
-
- Stopwatch GenerateTimer;
- CompositeBuffer Payload;
- if (NewBlocks.BlockHeaders[BlockIndex])
- {
- Payload = RebuildBlock(Path,
- Content,
- Lookup,
- std::move(NewBlocks.BlockHeaders[BlockIndex]),
- NewBlockChunks[BlockIndex],
- DiskStats)
- .GetCompressed();
- }
- else
- {
- ChunkBlockDescription BlockDescription;
- CompressedBuffer CompressedBlock =
- GenerateBlock(Path, Content, Lookup, NewBlockChunks[BlockIndex], BlockDescription, DiskStats);
- if (!CompressedBlock)
- {
- throw std::runtime_error(fmt::format("Failed generating block {}", BlockHash));
- }
- ZEN_ASSERT(BlockDescription.BlockHash == BlockHash);
- Payload = std::move(CompressedBlock).GetCompressed();
- }
-
- GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex];
- GeneratedBlockCount++;
- if (GeneratedBlockCount == GenerateBlockIndexes.size())
- {
- FilteredGenerateBlockBytesPerSecond.Stop();
- }
- ZEN_CONSOLE_VERBOSE("{} block {} ({}) containing {} chunks in {}",
- NewBlocks.BlockHeaders[BlockIndex] ? "Regenerated" : "Generated",
- NewBlocks.BlockDescriptions[BlockIndex].BlockHash,
- NiceBytes(NewBlocks.BlockSizes[BlockIndex]),
- NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(),
- NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs()));
- if (!AbortFlag)
- {
- AsyncUploadBlock(BlockIndex, BlockHash, std::move(Payload), QueuedPendingInMemoryBlocksForUpload);
- }
- }
- });
- }
- }
-
- // Start compression of any non-precompressed loose chunks and schedule upload
- for (const uint32_t LooseChunkOrderIndex : LooseChunkOrderIndexes)
- {
- const uint32_t ChunkIndex = LooseChunkIndexes[LooseChunkOrderIndex];
- Work.ScheduleWork(
- ReadChunkPool,
- [&Path,
- &Content,
- &Lookup,
- &TempDir,
- &LooseChunksStats,
- &LooseChunkOrderIndexes,
- &FilteredCompressedBytesPerSecond,
- &UploadStats,
- &AsyncUploadLooseChunk,
- ChunkIndex](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk");
-
- FilteredCompressedBytesPerSecond.Start();
- Stopwatch CompressTimer;
- CompositeBuffer Payload = CompressChunk(Path, Content, Lookup, ChunkIndex, TempDir, LooseChunksStats);
- ZEN_CONSOLE_VERBOSE("Compressed chunk {} ({} -> {}) in {}",
- Content.ChunkedContent.ChunkHashes[ChunkIndex],
- NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]),
- NiceBytes(Payload.GetSize()),
- NiceTimeSpanMs(CompressTimer.GetElapsedTimeMs()));
- const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
- UploadStats.ReadFromDiskBytes += ChunkRawSize;
- if (LooseChunksStats.CompressedChunkCount == LooseChunkOrderIndexes.size())
- {
- FilteredCompressedBytesPerSecond.Stop();
- }
- if (!AbortFlag)
- {
- AsyncUploadLooseChunk(Content.ChunkedContent.ChunkHashes[ChunkIndex], ChunkRawSize, std::move(Payload));
- }
- }
- });
- }
-
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(PendingWork);
- FilteredCompressedBytesPerSecond.Update(LooseChunksStats.CompressedChunkRawBytes.load());
- FilteredGenerateBlockBytesPerSecond.Update(GeneratedBlockByteCount.load());
- FilteredUploadedBytesPerSecond.Update(UploadedCompressedChunkSize.load() + UploadedBlockSize.load());
- uint64_t UploadedRawSize = UploadedRawChunkSize.load() + UploadedBlockSize.load();
- uint64_t UploadedCompressedSize = UploadedCompressedChunkSize.load() + UploadedBlockSize.load();
-
- std::string Details = fmt::format(
- "Compressed {}/{} ({}/{} {}B/s) chunks. "
- "Uploaded {}/{} ({}/{}) blobs "
- "({} {}bits/s)",
- LooseChunksStats.CompressedChunkCount.load(),
- LooseChunkOrderIndexes.size(),
- NiceBytes(LooseChunksStats.CompressedChunkRawBytes),
- NiceBytes(TotalLooseChunksSize),
- NiceNum(FilteredCompressedBytesPerSecond.GetCurrent()),
-
- UploadedBlockCount.load() + UploadedChunkCount.load(),
- UploadBlockCount + UploadChunkCount,
- NiceBytes(UploadedRawSize),
- NiceBytes(TotalRawSize),
-
- NiceBytes(UploadedCompressedSize),
- NiceNum(FilteredUploadedBytesPerSecond.GetCurrent()));
-
- ProgressBar.UpdateState({.Task = "Uploading blobs ",
- .Details = Details,
- .TotalCount = gsl::narrow<uint64_t>(TotalRawSize),
- .RemainingCount = gsl::narrow<uint64_t>(TotalRawSize - UploadedRawSize),
- .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
- false);
- });
-
- ZEN_ASSERT(AbortFlag || QueuedPendingInMemoryBlocksForUpload.load() == 0);
-
- ProgressBar.Finish();
-
- UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS();
- LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTimeUS();
- }
- }
-
- std::vector<size_t> FindReuseBlocks(const std::vector<ChunkBlockDescription>& KnownBlocks,
- std::span<const IoHash> ChunkHashes,
- std::span<const uint32_t> ChunkIndexes,
- uint8_t MinPercentLimit,
- std::vector<uint32_t>& OutUnusedChunkIndexes,
- FindBlocksStatistics& FindBlocksStats)
- {
- ZEN_TRACE_CPU("FindReuseBlocks");
-
- // Find all blocks with a usage level higher than MinPercentLimit
- // Pick out the blocks with usage higher or equal to MinPercentLimit
- // Sort them with highest size usage - most usage first
- // Make a list of all chunks and mark them as not found
- // For each block, recalculate the block has usage percent based on the chunks marked as not found
- // If the block still reaches MinPercentLimit, keep it and remove the matching chunks from the not found list
- // Repeat for following all remaining block that initially matched MinPercentLimit
-
- std::vector<size_t> FilteredReuseBlockIndexes;
-
- uint32_t ChunkCount = gsl::narrow<uint32_t>(ChunkHashes.size());
- std::vector<bool> ChunkFound(ChunkCount, false);
-
- if (ChunkCount > 0)
- {
- if (!KnownBlocks.empty())
- {
- Stopwatch ReuseTimer;
-
- tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex;
- ChunkHashToChunkIndex.reserve(ChunkIndexes.size());
- for (uint32_t ChunkIndex : ChunkIndexes)
- {
- ChunkHashToChunkIndex.insert_or_assign(ChunkHashes[ChunkIndex], ChunkIndex);
- }
-
- std::vector<size_t> BlockSizes(KnownBlocks.size(), 0);
- std::vector<size_t> BlockUseSize(KnownBlocks.size(), 0);
-
- std::vector<size_t> ReuseBlockIndexes;
-
- for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++)
- {
- const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex];
- if (KnownBlock.BlockHash != IoHash::Zero &&
- KnownBlock.ChunkRawHashes.size() == KnownBlock.ChunkCompressedLengths.size())
- {
- size_t BlockAttachmentCount = KnownBlock.ChunkRawHashes.size();
- if (BlockAttachmentCount == 0)
- {
- continue;
- }
- size_t ReuseSize = 0;
- size_t BlockSize = 0;
- size_t FoundAttachmentCount = 0;
- size_t BlockChunkCount = KnownBlock.ChunkRawHashes.size();
- for (size_t BlockChunkIndex = 0; BlockChunkIndex < BlockChunkCount; BlockChunkIndex++)
- {
- const IoHash& BlockChunkHash = KnownBlock.ChunkRawHashes[BlockChunkIndex];
- const uint32_t BlockChunkSize = KnownBlock.ChunkCompressedLengths[BlockChunkIndex];
- BlockSize += BlockChunkSize;
- if (ChunkHashToChunkIndex.contains(BlockChunkHash))
- {
- ReuseSize += BlockChunkSize;
- FoundAttachmentCount++;
- }
- }
-
- size_t ReusePercent = (ReuseSize * 100) / BlockSize;
-
- if (ReusePercent >= MinPercentLimit)
- {
- ZEN_CONSOLE_VERBOSE("Reusing block {}. {} attachments found, usage level: {}%",
- KnownBlock.BlockHash,
- FoundAttachmentCount,
- ReusePercent);
- ReuseBlockIndexes.push_back(KnownBlockIndex);
-
- BlockSizes[KnownBlockIndex] = BlockSize;
- BlockUseSize[KnownBlockIndex] = ReuseSize;
- }
- else if (FoundAttachmentCount > 0)
- {
- // ZEN_CONSOLE_VERBOSE("Skipping block {}. {} attachments found, usage level: {}%", KnownBlock.BlockHash,
- // FoundAttachmentCount, ReusePercent);
- FindBlocksStats.RejectedBlockCount++;
- FindBlocksStats.RejectedChunkCount += FoundAttachmentCount;
- FindBlocksStats.RejectedByteCount += ReuseSize;
- }
- }
- }
-
- if (!ReuseBlockIndexes.empty())
- {
- std::sort(ReuseBlockIndexes.begin(), ReuseBlockIndexes.end(), [&](size_t Lhs, size_t Rhs) {
- return BlockUseSize[Lhs] > BlockUseSize[Rhs];
- });
-
- for (size_t KnownBlockIndex : ReuseBlockIndexes)
- {
- std::vector<uint32_t> FoundChunkIndexes;
- size_t BlockSize = 0;
- size_t AdjustedReuseSize = 0;
- size_t AdjustedRawReuseSize = 0;
- const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex];
- for (size_t BlockChunkIndex = 0; BlockChunkIndex < KnownBlock.ChunkRawHashes.size(); BlockChunkIndex++)
- {
- const IoHash& BlockChunkHash = KnownBlock.ChunkRawHashes[BlockChunkIndex];
- const uint32_t BlockChunkSize = KnownBlock.ChunkCompressedLengths[BlockChunkIndex];
- BlockSize += BlockChunkSize;
- if (auto It = ChunkHashToChunkIndex.find(BlockChunkHash); It != ChunkHashToChunkIndex.end())
- {
- const uint32_t ChunkIndex = It->second;
- if (!ChunkFound[ChunkIndex])
- {
- FoundChunkIndexes.push_back(ChunkIndex);
- AdjustedReuseSize += KnownBlock.ChunkCompressedLengths[BlockChunkIndex];
- AdjustedRawReuseSize += KnownBlock.ChunkRawLengths[BlockChunkIndex];
- }
- }
- }
-
- size_t ReusePercent = (AdjustedReuseSize * 100) / BlockSize;
-
- if (ReusePercent >= MinPercentLimit)
- {
- ZEN_CONSOLE_VERBOSE("Reusing block {}. {} attachments found, usage level: {}%",
- KnownBlock.BlockHash,
- FoundChunkIndexes.size(),
- ReusePercent);
- FilteredReuseBlockIndexes.push_back(KnownBlockIndex);
-
- for (uint32_t ChunkIndex : FoundChunkIndexes)
- {
- ChunkFound[ChunkIndex] = true;
- }
- FindBlocksStats.AcceptedChunkCount += FoundChunkIndexes.size();
- FindBlocksStats.AcceptedByteCount += AdjustedReuseSize;
- FindBlocksStats.AcceptedRawByteCount += AdjustedRawReuseSize;
- FindBlocksStats.AcceptedReduntantChunkCount += KnownBlock.ChunkRawHashes.size() - FoundChunkIndexes.size();
- FindBlocksStats.AcceptedReduntantByteCount += BlockSize - AdjustedReuseSize;
- }
- else
- {
- // ZEN_CONSOLE_VERBOSE("Skipping block {}. filtered usage level: {}%", KnownBlock.BlockHash, ReusePercent);
- FindBlocksStats.RejectedBlockCount++;
- FindBlocksStats.RejectedChunkCount += FoundChunkIndexes.size();
- FindBlocksStats.RejectedByteCount += AdjustedReuseSize;
- }
- }
- }
- }
- OutUnusedChunkIndexes.reserve(ChunkIndexes.size() - FindBlocksStats.AcceptedChunkCount);
- for (uint32_t ChunkIndex : ChunkIndexes)
- {
- if (!ChunkFound[ChunkIndex])
- {
- OutUnusedChunkIndexes.push_back(ChunkIndex);
- }
- }
- }
- return FilteredReuseBlockIndexes;
- };
-
void UploadFolder(StorageInstance& Storage,
const Oid& BuildId,
const Oid& BuildPartId,
@@ -3110,1104 +1573,242 @@ namespace {
bool AllowMultiparts,
const CbObject& MetaData,
bool CreateBuild,
- bool IgnoreExistingBlocks,
- bool PostUploadVerify)
+ bool IgnoreExistingBlocks)
{
- ZEN_TRACE_CPU("UploadFolder");
-
ProgressBar::SetLogOperationName(ProgressMode, "Upload Folder");
-
- enum TaskSteps : uint32_t
{
- PrepareBuild,
- CalculateDelta,
- Upload,
- Validate,
- Cleanup,
- StepCount
- };
-
- auto EndProgress =
- MakeGuard([&]() { ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::StepCount, TaskSteps::StepCount); });
+ Stopwatch UploadTimer;
- Stopwatch ProcessTimer;
+ ConsoleOpLogOutput Output(ProgressMode);
- CreateDirectories(TempDir);
- CleanDirectory(TempDir, {});
- auto _ = MakeGuard([&]() {
- if (CleanDirectory(TempDir, {}))
+ BuildsOperationUploadFolder UploadOp(
+ Output,
+ Storage,
+ AbortFlag,
+ PauseFlag,
+ GetIOWorkerPool(),
+ GetNetworkPool(),
+ BuildId,
+ BuildPartId,
+ BuildPartName,
+ Path,
+ ManifestPath,
+ CreateBuild,
+ std::move(MetaData),
+ BuildsOperationUploadFolder::Options{.IsQuiet = IsQuiet,
+ .IsVerbose = IsVerbose,
+ .FindBlockMaxCount = FindBlockMaxCount,
+ .BlockReuseMinPercentLimit = BlockReuseMinPercentLimit,
+ .AllowMultiparts = AllowMultiparts,
+ .IgnoreExistingBlocks = IgnoreExistingBlocks,
+ .TempDir = TempDir,
+ .ExcludeFolders = DefaultExcludeFolders,
+ .ExcludeExtensions = DefaultExcludeExtensions,
+ .ZenExcludeManifestName = ZenExcludeManifestName});
+ UploadOp.Execute();
+ if (AbortFlag)
{
- std::error_code DummyEc;
- RemoveDir(TempDir, DummyEc);
+ return;
}
- });
-
- ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::PrepareBuild, TaskSteps::StepCount);
-
- std::uint64_t TotalRawSize = 0;
-
- CbObject ChunkerParameters;
-
- struct PrepareBuildResult
- {
- std::vector<ChunkBlockDescription> KnownBlocks;
- uint64_t PreferredMultipartChunkSize = DefaultPreferredMultipartChunkSize;
- uint64_t PayloadSize = 0;
- uint64_t PrepareBuildTimeMs = 0;
- uint64_t FindBlocksTimeMs = 0;
- uint64_t ElapsedTimeMs = 0;
- };
-
- FindBlocksStatistics FindBlocksStats;
-
- std::future<PrepareBuildResult> PrepBuildResultFuture = GetNetworkPool().EnqueueTask(
- std::packaged_task<PrepareBuildResult()>{
- [&Storage, BuildId, FindBlockMaxCount, &MetaData, CreateBuild, AllowMultiparts, IgnoreExistingBlocks, &FindBlocksStats] {
- ZEN_TRACE_CPU("PrepareBuild");
-
- PrepareBuildResult Result;
- Stopwatch Timer;
- if (CreateBuild)
- {
- ZEN_TRACE_CPU("CreateBuild");
-
- Stopwatch PutBuildTimer;
- CbObject PutBuildResult = Storage.BuildStorage->PutBuild(BuildId, MetaData);
- Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs();
- Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize);
- Result.PayloadSize = MetaData.GetSize();
- }
- else
- {
- ZEN_TRACE_CPU("PutBuild");
- Stopwatch GetBuildTimer;
- CbObject Build = Storage.BuildStorage->GetBuild(BuildId);
- Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs();
- Result.PayloadSize = Build.GetSize();
- if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0)
- {
- Result.PreferredMultipartChunkSize = ChunkSize;
- }
- else if (AllowMultiparts)
- {
- ZEN_CONSOLE_WARN("PreferredMultipartChunkSize is unknown. Defaulting to '{}'",
- NiceBytes(Result.PreferredMultipartChunkSize));
- }
- }
- if (!IgnoreExistingBlocks)
- {
- ZEN_TRACE_CPU("FindBlocks");
- Stopwatch KnownBlocksTimer;
- CbObject BlockDescriptionList = Storage.BuildStorage->FindBlocks(BuildId, FindBlockMaxCount);
- if (BlockDescriptionList)
- {
- Result.KnownBlocks = ParseChunkBlockDescriptionList(BlockDescriptionList);
- }
- FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs();
- FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size();
- Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs();
- }
- Result.ElapsedTimeMs = Timer.GetElapsedTimeMs();
- return Result;
- }},
- WorkerThreadPool::EMode::EnableBacklog);
-
- ChunkedFolderContent LocalContent;
+ ZEN_CONSOLE_VERBOSE(
+ "Folder scanning stats:"
+ "\n FoundFileCount: {}"
+ "\n FoundFileByteCount: {}"
+ "\n AcceptedFileCount: {}"
+ "\n AcceptedFileByteCount: {}"
+ "\n ElapsedWallTimeUS: {}",
+ UploadOp.m_LocalFolderScanStats.FoundFileCount.load(),
+ NiceBytes(UploadOp.m_LocalFolderScanStats.FoundFileByteCount.load()),
+ UploadOp.m_LocalFolderScanStats.AcceptedFileCount.load(),
+ NiceBytes(UploadOp.m_LocalFolderScanStats.AcceptedFileByteCount.load()),
+ NiceLatencyNs(UploadOp.m_LocalFolderScanStats.ElapsedWallTimeUS * 1000));
- GetFolderContentStatistics LocalFolderScanStats;
- ChunkingStatistics ChunkingStats;
- {
- auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders](const std::string_view& RelativePath) -> bool {
- for (const std::string& ExcludeFolder : ExcludeFolders)
- {
- if (RelativePath.starts_with(ExcludeFolder))
- {
- if (RelativePath.length() == ExcludeFolder.length())
- {
- return false;
- }
- else if (RelativePath[ExcludeFolder.length()] == '/')
- {
- return false;
- }
- }
- }
- return true;
- };
+ ZEN_CONSOLE_VERBOSE(
+ "Chunking stats:"
+ "\n FilesProcessed: {}"
+ "\n FilesChunked: {}"
+ "\n BytesHashed: {}"
+ "\n UniqueChunksFound: {}"
+ "\n UniqueSequencesFound: {}"
+ "\n UniqueBytesFound: {}"
+ "\n ElapsedWallTimeUS: {}",
+ UploadOp.m_ChunkingStats.FilesProcessed.load(),
+ UploadOp.m_ChunkingStats.FilesChunked.load(),
+ NiceBytes(UploadOp.m_ChunkingStats.BytesHashed.load()),
+ UploadOp.m_ChunkingStats.UniqueChunksFound.load(),
+ UploadOp.m_ChunkingStats.UniqueSequencesFound.load(),
+ NiceBytes(UploadOp.m_ChunkingStats.UniqueBytesFound.load()),
+ NiceLatencyNs(UploadOp.m_ChunkingStats.ElapsedWallTimeUS * 1000));
- auto IsAcceptedFile = [ExcludeExtensions =
- DefaultExcludeExtensions](const std::string_view& RelativePath, uint64_t, uint32_t) -> bool {
- for (const std::string& ExcludeExtension : ExcludeExtensions)
- {
- if (RelativePath.ends_with(ExcludeExtension))
- {
- return false;
- }
- }
- return true;
- };
+ ZEN_CONSOLE_VERBOSE(
+ "Find block stats:"
+ "\n FindBlockTimeMS: {}"
+ "\n PotentialChunkCount: {}"
+ "\n PotentialChunkByteCount: {}"
+ "\n FoundBlockCount: {}"
+ "\n FoundBlockChunkCount: {}"
+ "\n FoundBlockByteCount: {}"
+ "\n AcceptedBlockCount: {}"
+ "\n AcceptedChunkCount: {}"
+ "\n AcceptedByteCount: {}"
+ "\n AcceptedRawByteCount: {}"
+ "\n RejectedBlockCount: {}"
+ "\n RejectedChunkCount: {}"
+ "\n RejectedByteCount: {}"
+ "\n AcceptedReduntantChunkCount: {}"
+ "\n AcceptedReduntantByteCount: {}"
+ "\n NewBlocksCount: {}"
+ "\n NewBlocksChunkCount: {}"
+ "\n NewBlocksChunkByteCount: {}",
+ NiceTimeSpanMs(UploadOp.m_FindBlocksStats.FindBlockTimeMS),
+ UploadOp.m_FindBlocksStats.PotentialChunkCount,
+ NiceBytes(UploadOp.m_FindBlocksStats.PotentialChunkByteCount),
+ UploadOp.m_FindBlocksStats.FoundBlockCount,
+ UploadOp.m_FindBlocksStats.FoundBlockChunkCount,
+ NiceBytes(UploadOp.m_FindBlocksStats.FoundBlockByteCount),
+ UploadOp.m_FindBlocksStats.AcceptedBlockCount,
+ UploadOp.m_FindBlocksStats.AcceptedChunkCount,
+ NiceBytes(UploadOp.m_FindBlocksStats.AcceptedByteCount),
+ NiceBytes(UploadOp.m_FindBlocksStats.AcceptedRawByteCount),
+ UploadOp.m_FindBlocksStats.RejectedBlockCount,
+ UploadOp.m_FindBlocksStats.RejectedChunkCount,
+ NiceBytes(UploadOp.m_FindBlocksStats.RejectedByteCount),
+ UploadOp.m_FindBlocksStats.AcceptedReduntantChunkCount,
+ NiceBytes(UploadOp.m_FindBlocksStats.AcceptedReduntantByteCount),
+ UploadOp.m_FindBlocksStats.NewBlocksCount,
+ UploadOp.m_FindBlocksStats.NewBlocksChunkCount,
+ NiceBytes(UploadOp.m_FindBlocksStats.NewBlocksChunkByteCount));
- auto ParseManifest = [](const std::filesystem::path& Path,
- const std::filesystem::path& ManifestPath) -> std::vector<std::filesystem::path> {
- std::vector<std::filesystem::path> AssetPaths;
- std::filesystem::path AbsoluteManifestPath =
- MakeSafeAbsolutePath(ManifestPath.is_absolute() ? ManifestPath : Path / ManifestPath);
- IoBuffer ManifestContent = ReadFile(AbsoluteManifestPath).Flatten();
- std::string_view ManifestString((const char*)ManifestContent.GetView().GetData(), ManifestContent.GetSize());
- std::string_view::size_type Offset = 0;
- while (Offset < ManifestContent.GetSize())
- {
- size_t PathBreakOffset = ManifestString.find_first_of("\t\r\n", Offset);
- if (PathBreakOffset == std::string_view::npos)
- {
- PathBreakOffset = ManifestContent.GetSize();
- }
- std::string_view AssetPath = ManifestString.substr(Offset, PathBreakOffset - Offset);
- if (!AssetPath.empty())
- {
- AssetPaths.emplace_back(std::filesystem::path(AssetPath));
- }
- Offset = PathBreakOffset;
- size_t EolOffset = ManifestString.find_first_of("\r\n", Offset);
- if (EolOffset == std::string_view::npos)
- {
- break;
- }
- Offset = EolOffset;
- size_t LineBreakOffset = ManifestString.find_first_not_of("\t\r\n", Offset);
- if (LineBreakOffset == std::string_view::npos)
- {
- break;
- }
- Offset = LineBreakOffset;
- }
- return AssetPaths;
- };
+ ZEN_CONSOLE_VERBOSE(
+ "Generate blocks stats:"
+ "\n GeneratedBlockByteCount: {}"
+ "\n GeneratedBlockCount: {}"
+ "\n GenerateBlocksElapsedWallTimeUS: {}",
+ NiceBytes(UploadOp.m_GenerateBlocksStats.GeneratedBlockByteCount.load()),
+ UploadOp.m_GenerateBlocksStats.GeneratedBlockCount.load(),
+ NiceLatencyNs(UploadOp.m_GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS * 1000));
- Stopwatch ScanTimer;
- FolderContent Content;
- if (ManifestPath.empty())
- {
- std::filesystem::path ExcludeManifestPath = Path / ZenExcludeManifestName;
- tsl::robin_set<std::string> ExcludeAssetPaths;
- if (IsFile(ExcludeManifestPath))
- {
- std::vector<std::filesystem::path> AssetPaths = ParseManifest(Path, ExcludeManifestPath);
- ExcludeAssetPaths.reserve(AssetPaths.size());
- for (const std::filesystem::path& AssetPath : AssetPaths)
- {
- ExcludeAssetPaths.insert(AssetPath.generic_string());
- }
- }
- Content = GetFolderContent(
- LocalFolderScanStats,
- Path,
- std::move(IsAcceptedFolder),
- [&IsAcceptedFile,
- &ExcludeAssetPaths](const std::string_view& RelativePath, uint64_t Size, uint32_t Attributes) -> bool {
- if (RelativePath == ZenExcludeManifestName)
- {
- return false;
- }
- if (!IsAcceptedFile(RelativePath, Size, Attributes))
- {
- return false;
- }
- if (ExcludeAssetPaths.contains(std::filesystem::path(RelativePath).generic_string()))
- {
- return false;
- }
- return true;
- },
- GetIOWorkerPool(),
- GetUpdateDelayMS(ProgressMode),
- [&](bool, std::ptrdiff_t) {
- ZEN_CONSOLE_VERBOSE("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path);
- },
- AbortFlag);
- }
- else
- {
- Stopwatch ManifestParseTimer;
- std::vector<std::filesystem::path> AssetPaths = ParseManifest(Path, ManifestPath);
- for (const std::filesystem::path& AssetPath : AssetPaths)
- {
- Content.Paths.push_back(AssetPath);
- const std::filesystem::path AssetFilePath = (Path / AssetPath).make_preferred();
- Content.RawSizes.push_back(FileSizeFromPath(AssetFilePath));
-#if ZEN_PLATFORM_WINDOWS
- Content.Attributes.push_back(GetFileAttributesFromPath(AssetFilePath));
-#endif // ZEN_PLATFORM_WINDOWS
-#if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX
- Content.Attributes.push_back(GetFileMode(AssetFilePath));
-#endif // ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX
- LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back();
- LocalFolderScanStats.AcceptedFileCount++;
- }
- if (ManifestPath.is_relative())
- {
- Content.Paths.push_back(ManifestPath);
- const std::filesystem::path ManifestFilePath = (Path / ManifestPath).make_preferred();
- Content.RawSizes.push_back(FileSizeFromPath(ManifestFilePath));
-#if ZEN_PLATFORM_WINDOWS
- Content.Attributes.push_back(GetFileAttributesFromPath(ManifestFilePath));
-#endif // ZEN_PLATFORM_WINDOWS
-#if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX
- Content.Attributes.push_back(GetFileMode(ManifestFilePath));
-#endif // ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX
+ ZEN_CONSOLE_VERBOSE(
+ "Generate blocks stats:"
+ "\n ChunkCount: {}"
+ "\n ChunkByteCount: {}"
+ "\n CompressedChunkCount: {}"
+ "\n CompressChunksElapsedWallTimeUS: {}",
+ UploadOp.m_LooseChunksStats.ChunkCount,
+ NiceBytes(UploadOp.m_LooseChunksStats.ChunkByteCount),
+ UploadOp.m_LooseChunksStats.CompressedChunkCount.load(),
+ NiceBytes(UploadOp.m_LooseChunksStats.CompressedChunkBytes.load()),
+ NiceLatencyNs(UploadOp.m_LooseChunksStats.CompressChunksElapsedWallTimeUS * 1000));
- LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back();
- LocalFolderScanStats.AcceptedFileCount++;
- }
- LocalFolderScanStats.FoundFileByteCount.store(LocalFolderScanStats.AcceptedFileByteCount);
- LocalFolderScanStats.FoundFileCount.store(LocalFolderScanStats.AcceptedFileCount);
- LocalFolderScanStats.ElapsedWallTimeUS = ManifestParseTimer.GetElapsedTimeUs();
- }
+ ZEN_CONSOLE_VERBOSE(
+ "Disk stats:"
+ "\n OpenReadCount: {}"
+ "\n OpenWriteCount: {}"
+ "\n ReadCount: {}"
+ "\n ReadByteCount: {}"
+ "\n WriteCount: {} ({} cloned)"
+ "\n WriteByteCount: {} ({} cloned)"
+ "\n CurrentOpenFileCount: {}",
+ UploadOp.m_DiskStats.OpenReadCount.load(),
+ UploadOp.m_DiskStats.OpenWriteCount.load(),
+ UploadOp.m_DiskStats.ReadCount.load(),
+ NiceBytes(UploadOp.m_DiskStats.ReadByteCount.load()),
+ UploadOp.m_DiskStats.WriteCount.load(),
+ UploadOp.m_DiskStats.CloneCount.load(),
+ NiceBytes(UploadOp.m_DiskStats.WriteByteCount.load()),
+ NiceBytes(UploadOp.m_DiskStats.CloneByteCount.load()),
+ UploadOp.m_DiskStats.CurrentOpenFileCount.load());
- std::unique_ptr<ChunkingController> ChunkController =
- CreateChunkingControllerWithFixedChunking(ChunkingControllerWithFixedChunkingSettings{});
- {
- CbObjectWriter ChunkParametersWriter;
- ChunkParametersWriter.AddString("name"sv, ChunkController->GetName());
- ChunkParametersWriter.AddObject("parameters"sv, ChunkController->GetParameters());
- ChunkerParameters = ChunkParametersWriter.Save();
- }
-
- TotalRawSize = std::accumulate(Content.RawSizes.begin(), Content.RawSizes.end(), std::uint64_t(0));
-
- {
- ProgressBar ProgressBar(ProgressMode, "Scan Files");
- FilteredRate FilteredBytesHashed;
- FilteredBytesHashed.Start();
- LocalContent = ChunkFolderContent(
- ChunkingStats,
- GetIOWorkerPool(),
- Path,
- Content,
- *ChunkController,
- GetUpdateDelayMS(ProgressMode),
- [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) {
- FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load());
- std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found",
- ChunkingStats.FilesProcessed.load(),
- Content.Paths.size(),
- NiceBytes(ChunkingStats.BytesHashed.load()),
- NiceBytes(TotalRawSize),
- NiceNum(FilteredBytesHashed.GetCurrent()),
- ChunkingStats.UniqueChunksFound.load(),
- NiceBytes(ChunkingStats.UniqueBytesFound.load()));
- ProgressBar.UpdateState({.Task = "Scanning files ",
- .Details = Details,
- .TotalCount = TotalRawSize,
- .RemainingCount = TotalRawSize - ChunkingStats.BytesHashed.load(),
- .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
- false);
- },
- AbortFlag,
- PauseFlag);
- FilteredBytesHashed.Stop();
- ProgressBar.Finish();
- if (AbortFlag)
- {
- return;
- }
- }
+ ZEN_CONSOLE_VERBOSE(
+ "Upload stats:"
+ "\n BlockCount: {}"
+ "\n BlocksBytes: {}"
+ "\n ChunkCount: {}"
+ "\n ChunksBytes: {}"
+ "\n ReadFromDiskBytes: {}"
+ "\n MultipartAttachmentCount: {}"
+ "\n ElapsedWallTimeUS: {}",
+ UploadOp.m_UploadStats.BlockCount.load(),
+ NiceBytes(UploadOp.m_UploadStats.BlocksBytes.load()),
+ UploadOp.m_UploadStats.ChunkCount.load(),
+ NiceBytes(UploadOp.m_UploadStats.ChunksBytes.load()),
+ NiceBytes(UploadOp.m_UploadStats.ReadFromDiskBytes.load()),
+ UploadOp.m_UploadStats.MultipartAttachmentCount.load(),
+ NiceLatencyNs(UploadOp.m_UploadStats.ElapsedWallTimeUS * 1000));
+
+ const double DeltaByteCountPercent =
+ UploadOp.m_ChunkingStats.BytesHashed > 0
+ ? (100.0 * (UploadOp.m_FindBlocksStats.NewBlocksChunkByteCount + UploadOp.m_LooseChunksStats.CompressedChunkBytes)) /
+ (UploadOp.m_ChunkingStats.BytesHashed)
+ : 0.0;
+
+ const std::string MultipartAttachmentStats =
+ AllowMultiparts ? fmt::format(" ({} as multipart)", UploadOp.m_UploadStats.MultipartAttachmentCount.load()) : "";
if (!IsQuiet)
{
- ZEN_CONSOLE("Found {} ({}) files divided into {} ({}) unique chunks in '{}' in {}. Average hash rate {}B/sec",
- LocalContent.Paths.size(),
- NiceBytes(TotalRawSize),
- ChunkingStats.UniqueChunksFound.load(),
- NiceBytes(ChunkingStats.UniqueBytesFound.load()),
- Path,
- NiceTimeSpanMs(ScanTimer.GetElapsedTimeMs()),
- NiceNum(GetBytesPerSecond(ChunkingStats.ElapsedWallTimeUS, ChunkingStats.BytesHashed)));
- }
- }
-
- const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalContent);
-
- GenerateBlocksStatistics GenerateBlocksStats;
- LooseChunksStatistics LooseChunksStats;
-
- std::vector<size_t> ReuseBlockIndexes;
- std::vector<uint32_t> NewBlockChunkIndexes;
-
- PrepareBuildResult PrepBuildResult = PrepBuildResultFuture.get();
-
- if (!IsQuiet)
- {
- ZEN_CONSOLE("Build prepare took {}. {} took {}, payload size {}{}",
- NiceTimeSpanMs(PrepBuildResult.ElapsedTimeMs),
- CreateBuild ? "PutBuild" : "GetBuild",
- NiceTimeSpanMs(PrepBuildResult.PrepareBuildTimeMs),
- NiceBytes(PrepBuildResult.PayloadSize),
- IgnoreExistingBlocks ? ""
- : fmt::format(". Found {} blocks in {}",
- PrepBuildResult.KnownBlocks.size(),
- NiceTimeSpanMs(PrepBuildResult.FindBlocksTimeMs)));
- }
-
- ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::CalculateDelta, TaskSteps::StepCount);
-
- const std::uint64_t LargeAttachmentSize = AllowMultiparts ? PrepBuildResult.PreferredMultipartChunkSize * 4u : (std::uint64_t)-1;
-
- Stopwatch BlockArrangeTimer;
-
- std::vector<std::uint32_t> LooseChunkIndexes;
- {
- bool EnableBlocks = true;
- std::vector<std::uint32_t> BlockChunkIndexes;
- for (uint32_t ChunkIndex = 0; ChunkIndex < LocalContent.ChunkedContent.ChunkHashes.size(); ChunkIndex++)
- {
- const uint64_t ChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex];
- if (!EnableBlocks || ChunkRawSize == 0 || ChunkRawSize > DefaultChunksBlockParams.MaxChunkEmbedSize)
- {
- LooseChunkIndexes.push_back(ChunkIndex);
- LooseChunksStats.ChunkByteCount += ChunkRawSize;
- }
- else
- {
- BlockChunkIndexes.push_back(ChunkIndex);
- FindBlocksStats.PotentialChunkByteCount += ChunkRawSize;
- }
- }
- FindBlocksStats.PotentialChunkCount = BlockChunkIndexes.size();
- LooseChunksStats.ChunkCount = LooseChunkIndexes.size();
-
- if (IgnoreExistingBlocks)
- {
- if (!IsQuiet)
- {
- ZEN_CONSOLE("Ignoring any existing blocks in store");
- }
- NewBlockChunkIndexes = std::move(BlockChunkIndexes);
- }
- else
- {
- ReuseBlockIndexes = FindReuseBlocks(PrepBuildResult.KnownBlocks,
- LocalContent.ChunkedContent.ChunkHashes,
- BlockChunkIndexes,
- BlockReuseMinPercentLimit,
- NewBlockChunkIndexes,
- FindBlocksStats);
- FindBlocksStats.AcceptedBlockCount = ReuseBlockIndexes.size();
-
- for (const ChunkBlockDescription& Description : PrepBuildResult.KnownBlocks)
- {
- for (uint32_t ChunkRawLength : Description.ChunkRawLengths)
- {
- FindBlocksStats.FoundBlockByteCount += ChunkRawLength;
- }
- FindBlocksStats.FoundBlockChunkCount += Description.ChunkRawHashes.size();
- }
- }
- }
-
- std::vector<std::vector<uint32_t>> NewBlockChunks;
- ArrangeChunksIntoBlocks(LocalContent,
- LocalLookup,
- DefaultChunksBlockParams.MaxBlockSize,
- DefaultChunksBlockParams.MaxChunksPerBlock,
- NewBlockChunkIndexes,
- NewBlockChunks);
-
- FindBlocksStats.NewBlocksCount = NewBlockChunks.size();
- for (uint32_t ChunkIndex : NewBlockChunkIndexes)
- {
- FindBlocksStats.NewBlocksChunkByteCount += LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex];
- }
- FindBlocksStats.NewBlocksChunkCount = NewBlockChunkIndexes.size();
-
- const double AcceptedByteCountPercent =
- FindBlocksStats.PotentialChunkByteCount > 0
- ? (100.0 * FindBlocksStats.AcceptedRawByteCount / FindBlocksStats.PotentialChunkByteCount)
- : 0.0;
-
- const double AcceptedReduntantByteCountPercent =
- FindBlocksStats.AcceptedByteCount > 0 ? (100.0 * FindBlocksStats.AcceptedReduntantByteCount) /
- (FindBlocksStats.AcceptedByteCount + FindBlocksStats.AcceptedReduntantByteCount)
- : 0.0;
- if (!IsQuiet)
- {
- ZEN_CONSOLE(
- "Found {} chunks in {} ({}) blocks eligible for reuse in {}\n"
- " Reusing {} ({}) matching chunks in {} blocks ({:.1f}%)\n"
- " Accepting {} ({}) redundant chunks ({:.1f}%)\n"
- " Rejected {} ({}) chunks in {} blocks\n"
- " Arranged {} ({}) chunks in {} new blocks\n"
- " Keeping {} ({}) chunks as loose chunks\n"
- " Discovery completed in {}",
- FindBlocksStats.FoundBlockChunkCount,
- FindBlocksStats.FoundBlockCount,
- NiceBytes(FindBlocksStats.FoundBlockByteCount),
- NiceTimeSpanMs(FindBlocksStats.FindBlockTimeMS),
-
- FindBlocksStats.AcceptedChunkCount,
- NiceBytes(FindBlocksStats.AcceptedRawByteCount),
- FindBlocksStats.AcceptedBlockCount,
- AcceptedByteCountPercent,
-
- FindBlocksStats.AcceptedReduntantChunkCount,
- NiceBytes(FindBlocksStats.AcceptedReduntantByteCount),
- AcceptedReduntantByteCountPercent,
-
- FindBlocksStats.RejectedChunkCount,
- NiceBytes(FindBlocksStats.RejectedByteCount),
- FindBlocksStats.RejectedBlockCount,
-
- FindBlocksStats.NewBlocksChunkCount,
- NiceBytes(FindBlocksStats.NewBlocksChunkByteCount),
- FindBlocksStats.NewBlocksCount,
-
- LooseChunksStats.ChunkCount,
- NiceBytes(LooseChunksStats.ChunkByteCount),
-
- NiceTimeSpanMs(BlockArrangeTimer.GetElapsedTimeMs()));
- }
-
- DiskStatistics DiskStats;
- UploadStatistics UploadStats;
- GeneratedBlocks NewBlocks;
-
- ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::Upload, TaskSteps::StepCount);
-
- if (!NewBlockChunks.empty())
- {
- Stopwatch GenerateBuildBlocksTimer;
- auto __ = MakeGuard([&]() {
- uint64_t BlockGenerateTimeUs = GenerateBuildBlocksTimer.GetElapsedTimeUs();
- if (!IsQuiet)
- {
- ZEN_CONSOLE("Generated {} ({}) and uploaded {} ({}) blocks in {}. Generate speed: {}B/sec. Transfer speed {}bits/sec.",
- GenerateBlocksStats.GeneratedBlockCount.load(),
- NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount),
- UploadStats.BlockCount.load(),
- NiceBytes(UploadStats.BlocksBytes.load()),
- NiceTimeSpanMs(BlockGenerateTimeUs / 1000),
- NiceNum(GetBytesPerSecond(GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS,
- GenerateBlocksStats.GeneratedBlockByteCount)),
- NiceNum(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, UploadStats.BlocksBytes * 8)));
- }
- });
- GenerateBuildBlocks(Path,
- LocalContent,
- LocalLookup,
- Storage,
- BuildId,
- NewBlockChunks,
- NewBlocks,
- DiskStats,
- UploadStats,
- GenerateBlocksStats);
- }
-
- CbObject PartManifest;
- {
- CbObjectWriter PartManifestWriter;
- Stopwatch ManifestGenerationTimer;
- auto __ = MakeGuard([&]() {
- if (!IsQuiet)
- {
- ZEN_CONSOLE("Generated build part manifest in {} ({})",
- NiceTimeSpanMs(ManifestGenerationTimer.GetElapsedTimeMs()),
- NiceBytes(PartManifestWriter.GetSaveSize()));
- }
- });
- PartManifestWriter.AddObject("chunker"sv, ChunkerParameters);
-
- std::vector<IoHash> AllChunkBlockHashes;
- std::vector<ChunkBlockDescription> AllChunkBlockDescriptions;
- AllChunkBlockHashes.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size());
- AllChunkBlockDescriptions.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size());
- for (size_t ReuseBlockIndex : ReuseBlockIndexes)
- {
- AllChunkBlockDescriptions.push_back(PrepBuildResult.KnownBlocks[ReuseBlockIndex]);
- AllChunkBlockHashes.push_back(PrepBuildResult.KnownBlocks[ReuseBlockIndex].BlockHash);
- }
- AllChunkBlockDescriptions.insert(AllChunkBlockDescriptions.end(),
- NewBlocks.BlockDescriptions.begin(),
- NewBlocks.BlockDescriptions.end());
- for (const ChunkBlockDescription& BlockDescription : NewBlocks.BlockDescriptions)
- {
- AllChunkBlockHashes.push_back(BlockDescription.BlockHash);
- }
-#if EXTRA_VERIFY
- tsl::robin_map<IoHash, size_t, IoHash::Hasher> ChunkHashToAbsoluteChunkIndex;
- std::vector<IoHash> AbsoluteChunkHashes;
- AbsoluteChunkHashes.reserve(LocalContent.ChunkedContent.ChunkHashes.size());
- for (uint32_t ChunkIndex : LooseChunkIndexes)
- {
- ChunkHashToAbsoluteChunkIndex.insert({LocalContent.ChunkedContent.ChunkHashes[ChunkIndex], AbsoluteChunkHashes.size()});
- AbsoluteChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]);
- }
- for (const ChunkBlockDescription& Block : AllChunkBlockDescriptions)
- {
- for (const IoHash& ChunkHash : Block.ChunkHashes)
- {
- ChunkHashToAbsoluteChunkIndex.insert({ChunkHash, AbsoluteChunkHashes.size()});
- AbsoluteChunkHashes.push_back(ChunkHash);
- }
- }
- for (const IoHash& ChunkHash : LocalContent.ChunkedContent.ChunkHashes)
- {
- ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(ChunkHash)] == ChunkHash);
- ZEN_ASSERT(LocalContent.ChunkedContent.ChunkHashes[LocalLookup.ChunkHashToChunkIndex.at(ChunkHash)] == ChunkHash);
- }
- for (const uint32_t ChunkIndex : LocalContent.ChunkedContent.ChunkOrders)
- {
- ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex])] ==
- LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]);
- ZEN_ASSERT(LocalLookup.ChunkHashToChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]) == ChunkIndex);
- }
-#endif // EXTRA_VERIFY
- std::vector<uint32_t> AbsoluteChunkOrders = CalculateAbsoluteChunkOrders(LocalContent.ChunkedContent.ChunkHashes,
- LocalContent.ChunkedContent.ChunkOrders,
- LocalLookup.ChunkHashToChunkIndex,
- LooseChunkIndexes,
- AllChunkBlockDescriptions);
-
-#if EXTRA_VERIFY
- for (uint32_t ChunkOrderIndex = 0; ChunkOrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); ChunkOrderIndex++)
- {
- uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[ChunkOrderIndex];
- uint32_t AbsoluteChunkIndex = AbsoluteChunkOrders[ChunkOrderIndex];
- const IoHash& LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
- const IoHash& AbsoluteChunkHash = AbsoluteChunkHashes[AbsoluteChunkIndex];
- ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash);
- }
-#endif // EXTRA_VERIFY
-
- WriteBuildContentToCompactBinary(PartManifestWriter,
- LocalContent.Platform,
- LocalContent.Paths,
- LocalContent.RawHashes,
- LocalContent.RawSizes,
- LocalContent.Attributes,
- LocalContent.ChunkedContent.SequenceRawHashes,
- LocalContent.ChunkedContent.ChunkCounts,
- LocalContent.ChunkedContent.ChunkHashes,
- LocalContent.ChunkedContent.ChunkRawSizes,
- AbsoluteChunkOrders,
- LooseChunkIndexes,
- AllChunkBlockHashes);
-
-#if EXTRA_VERIFY
- {
- ChunkedFolderContent VerifyFolderContent;
-
- std::vector<uint32_t> OutAbsoluteChunkOrders;
- std::vector<IoHash> OutLooseChunkHashes;
- std::vector<uint64_t> OutLooseChunkRawSizes;
- std::vector<IoHash> OutBlockRawHashes;
- ReadBuildContentFromCompactBinary(PartManifestWriter.Save(),
- VerifyFolderContent.Platform,
- VerifyFolderContent.Paths,
- VerifyFolderContent.RawHashes,
- VerifyFolderContent.RawSizes,
- VerifyFolderContent.Attributes,
- VerifyFolderContent.ChunkedContent.SequenceRawHashes,
- VerifyFolderContent.ChunkedContent.ChunkCounts,
- OutAbsoluteChunkOrders,
- OutLooseChunkHashes,
- OutLooseChunkRawSizes,
- OutBlockRawHashes);
- ZEN_ASSERT(OutBlockRawHashes == AllChunkBlockHashes);
-
- for (uint32_t OrderIndex = 0; OrderIndex < OutAbsoluteChunkOrders.size(); OrderIndex++)
- {
- uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex];
- const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
-
- uint32_t VerifyChunkIndex = OutAbsoluteChunkOrders[OrderIndex];
- const IoHash VerifyChunkHash = AbsoluteChunkHashes[VerifyChunkIndex];
-
- ZEN_ASSERT(LocalChunkHash == VerifyChunkHash);
- }
-
- CalculateLocalChunkOrders(OutAbsoluteChunkOrders,
- OutLooseChunkHashes,
- OutLooseChunkRawSizes,
- AllChunkBlockDescriptions,
- VerifyFolderContent.ChunkedContent.ChunkHashes,
- VerifyFolderContent.ChunkedContent.ChunkRawSizes,
- VerifyFolderContent.ChunkedContent.ChunkOrders);
-
- ZEN_ASSERT(LocalContent.Paths == VerifyFolderContent.Paths);
- ZEN_ASSERT(LocalContent.RawHashes == VerifyFolderContent.RawHashes);
- ZEN_ASSERT(LocalContent.RawSizes == VerifyFolderContent.RawSizes);
- ZEN_ASSERT(LocalContent.Attributes == VerifyFolderContent.Attributes);
- ZEN_ASSERT(LocalContent.ChunkedContent.SequenceRawHashes == VerifyFolderContent.ChunkedContent.SequenceRawHashes);
- ZEN_ASSERT(LocalContent.ChunkedContent.ChunkCounts == VerifyFolderContent.ChunkedContent.ChunkCounts);
-
- for (uint32_t OrderIndex = 0; OrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); OrderIndex++)
- {
- uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex];
- const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
- uint64_t LocalChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex];
-
- uint32_t VerifyChunkIndex = VerifyFolderContent.ChunkedContent.ChunkOrders[OrderIndex];
- const IoHash VerifyChunkHash = VerifyFolderContent.ChunkedContent.ChunkHashes[VerifyChunkIndex];
- uint64_t VerifyChunkRawSize = VerifyFolderContent.ChunkedContent.ChunkRawSizes[VerifyChunkIndex];
-
- ZEN_ASSERT(LocalChunkHash == VerifyChunkHash);
- ZEN_ASSERT(LocalChunkRawSize == VerifyChunkRawSize);
- }
- }
-#endif // EXTRA_VERIFY
- PartManifest = PartManifestWriter.Save();
- }
-
- Stopwatch PutBuildPartResultTimer;
- std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult =
- Storage.BuildStorage->PutBuildPart(BuildId, BuildPartId, BuildPartName, PartManifest);
- if (!IsQuiet)
- {
- ZEN_CONSOLE("PutBuildPart took {}, payload size {}. {} attachments are needed.",
- NiceTimeSpanMs(PutBuildPartResultTimer.GetElapsedTimeMs()),
- NiceBytes(PartManifest.GetSize()),
- PutBuildPartResult.second.size());
- }
- IoHash PartHash = PutBuildPartResult.first;
-
- auto UploadAttachments = [&Storage,
- &BuildId,
- &Path,
- &TempDir,
- &LocalContent,
- &LocalLookup,
- &NewBlockChunks,
- &NewBlocks,
- &LooseChunkIndexes,
- &LargeAttachmentSize,
- &DiskStats,
- &UploadStats,
- &LooseChunksStats](std::span<IoHash> RawHashes) {
- if (!AbortFlag)
- {
- UploadStatistics TempUploadStats;
- LooseChunksStatistics TempLooseChunksStats;
-
- Stopwatch TempUploadTimer;
- auto __ = MakeGuard([&]() {
- if (!IsQuiet)
- {
- uint64_t TempChunkUploadTimeUs = TempUploadTimer.GetElapsedTimeUs();
- ZEN_CONSOLE(
- "Uploaded {} ({}) blocks. "
- "Compressed {} ({} {}B/s) and uploaded {} ({}) chunks. "
- "Transferred {} ({}bits/s) in {}",
- TempUploadStats.BlockCount.load(),
- NiceBytes(TempUploadStats.BlocksBytes),
-
- TempLooseChunksStats.CompressedChunkCount.load(),
- NiceBytes(TempLooseChunksStats.CompressedChunkBytes.load()),
- NiceNum(GetBytesPerSecond(TempLooseChunksStats.CompressChunksElapsedWallTimeUS,
- TempLooseChunksStats.ChunkByteCount)),
- TempUploadStats.ChunkCount.load(),
- NiceBytes(TempUploadStats.ChunksBytes),
-
- NiceBytes(TempUploadStats.BlocksBytes + TempUploadStats.ChunksBytes),
- NiceNum(GetBytesPerSecond(TempUploadStats.ElapsedWallTimeUS, TempUploadStats.ChunksBytes * 8)),
- NiceTimeSpanMs(TempChunkUploadTimeUs / 1000));
- }
- });
- UploadPartBlobs(Storage,
- BuildId,
- Path,
- TempDir,
- LocalContent,
- LocalLookup,
- RawHashes,
- NewBlockChunks,
- NewBlocks,
- LooseChunkIndexes,
- LargeAttachmentSize,
- DiskStats,
- TempUploadStats,
- TempLooseChunksStats);
- UploadStats += TempUploadStats;
- LooseChunksStats += TempLooseChunksStats;
- }
- };
- if (IgnoreExistingBlocks)
- {
- ZEN_CONSOLE_VERBOSE("PutBuildPart uploading all attachments, needs are: {}",
- FormatArray<IoHash>(PutBuildPartResult.second, "\n "sv));
+ ZEN_CONSOLE(
+ "Uploaded part {} ('{}') to build {}, {}\n"
+ " Scanned files: {:>8} ({}), {}B/sec, {}\n"
+ " New data: {:>8} ({}) {:.1f}%\n"
+ " New blocks: {:>8} ({} -> {}), {}B/sec, {}\n"
+ " New chunks: {:>8} ({} -> {}), {}B/sec, {}\n"
+ " Uploaded: {:>8} ({}), {}bits/sec, {}\n"
+ " Blocks: {:>8} ({})\n"
+ " Chunks: {:>8} ({}){}",
+ BuildPartId,
+ BuildPartName,
+ BuildId,
+ NiceTimeSpanMs(UploadTimer.GetElapsedTimeMs()),
- std::vector<IoHash> ForceUploadChunkHashes;
- ForceUploadChunkHashes.reserve(LooseChunkIndexes.size());
+ UploadOp.m_LocalFolderScanStats.FoundFileCount.load(),
+ NiceBytes(UploadOp.m_LocalFolderScanStats.FoundFileByteCount.load()),
+ NiceNum(GetBytesPerSecond(UploadOp.m_ChunkingStats.ElapsedWallTimeUS, UploadOp.m_ChunkingStats.BytesHashed)),
+ NiceTimeSpanMs(UploadOp.m_ChunkingStats.ElapsedWallTimeUS / 1000),
- for (uint32_t ChunkIndex : LooseChunkIndexes)
- {
- ForceUploadChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]);
- }
+ UploadOp.m_FindBlocksStats.NewBlocksChunkCount + UploadOp.m_LooseChunksStats.CompressedChunkCount,
+ NiceBytes(UploadOp.m_FindBlocksStats.NewBlocksChunkByteCount + UploadOp.m_LooseChunksStats.CompressedChunkBytes),
+ DeltaByteCountPercent,
- for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockHeaders.size(); BlockIndex++)
- {
- if (NewBlocks.BlockHeaders[BlockIndex])
- {
- // Block was not uploaded during generation
- ForceUploadChunkHashes.push_back(NewBlocks.BlockDescriptions[BlockIndex].BlockHash);
- }
- }
- UploadAttachments(ForceUploadChunkHashes);
- }
- else if (!PutBuildPartResult.second.empty())
- {
- ZEN_CONSOLE_VERBOSE("PutBuildPart needs attachments: {}", FormatArray<IoHash>(PutBuildPartResult.second, "\n "sv));
- UploadAttachments(PutBuildPartResult.second);
- }
+ UploadOp.m_GenerateBlocksStats.GeneratedBlockCount.load(),
+ NiceBytes(UploadOp.m_FindBlocksStats.NewBlocksChunkByteCount),
+ NiceBytes(UploadOp.m_GenerateBlocksStats.GeneratedBlockByteCount.load()),
+ NiceNum(GetBytesPerSecond(UploadOp.m_GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS,
+ UploadOp.m_GenerateBlocksStats.GeneratedBlockByteCount)),
+ NiceTimeSpanMs(UploadOp.m_GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS / 1000),
- uint32_t FinalizeBuildPartRetryCount = 5;
- while (!AbortFlag && (FinalizeBuildPartRetryCount--) > 0)
- {
- Stopwatch FinalizeBuildPartTimer;
- std::vector<IoHash> Needs = Storage.BuildStorage->FinalizeBuildPart(BuildId, BuildPartId, PartHash);
- if (!IsQuiet)
- {
- ZEN_CONSOLE("FinalizeBuildPart took {}. {} attachments are missing.",
- NiceTimeSpanMs(FinalizeBuildPartTimer.GetElapsedTimeMs()),
- Needs.size());
- }
- if (Needs.empty())
- {
- break;
- }
- ZEN_CONSOLE_VERBOSE("FinalizeBuildPart needs attachments: {}", FormatArray<IoHash>(Needs, "\n "sv));
- UploadAttachments(Needs);
- }
+ UploadOp.m_LooseChunksStats.CompressedChunkCount.load(),
+ NiceBytes(UploadOp.m_LooseChunksStats.ChunkByteCount),
+ NiceBytes(UploadOp.m_LooseChunksStats.CompressedChunkBytes.load()),
+ NiceNum(GetBytesPerSecond(UploadOp.m_LooseChunksStats.CompressChunksElapsedWallTimeUS,
+ UploadOp.m_LooseChunksStats.ChunkByteCount)),
+ NiceTimeSpanMs(UploadOp.m_LooseChunksStats.CompressChunksElapsedWallTimeUS / 1000),
- if (CreateBuild && !AbortFlag)
- {
- Stopwatch FinalizeBuildTimer;
- Storage.BuildStorage->FinalizeBuild(BuildId);
- if (!IsQuiet)
- {
- ZEN_CONSOLE("FinalizeBuild took {}", NiceTimeSpanMs(FinalizeBuildTimer.GetElapsedTimeMs()));
- }
- }
+ UploadOp.m_UploadStats.BlockCount.load() + UploadOp.m_UploadStats.ChunkCount.load(),
+ NiceBytes(UploadOp.m_UploadStats.BlocksBytes + UploadOp.m_UploadStats.ChunksBytes),
+ NiceNum(GetBytesPerSecond(UploadOp.m_UploadStats.ElapsedWallTimeUS,
+ (UploadOp.m_UploadStats.ChunksBytes + UploadOp.m_UploadStats.BlocksBytes) * 8)),
+ NiceTimeSpanMs(UploadOp.m_UploadStats.ElapsedWallTimeUS / 1000),
- if (!NewBlocks.BlockDescriptions.empty() && !AbortFlag)
- {
- uint64_t UploadBlockMetadataCount = 0;
- Stopwatch UploadBlockMetadataTimer;
+ UploadOp.m_UploadStats.BlockCount.load(),
+ NiceBytes(UploadOp.m_UploadStats.BlocksBytes.load()),
- uint32_t FailedMetadataUploadCount = 1;
- int32_t MetadataUploadRetryCount = 3;
- while ((MetadataUploadRetryCount-- > 0) && (FailedMetadataUploadCount > 0))
- {
- FailedMetadataUploadCount = 0;
- for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockDescriptions.size(); BlockIndex++)
- {
- if (AbortFlag)
- {
- break;
- }
- const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash;
- if (!NewBlocks.MetaDataHasBeenUploaded[BlockIndex])
- {
- const CbObject BlockMetaData =
- BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]);
- if (Storage.BuildCacheStorage)
- {
- Storage.BuildCacheStorage->PutBlobMetadatas(BuildId,
- std::vector<IoHash>({BlockHash}),
- std::vector<CbObject>({BlockMetaData}));
- }
- bool MetadataSucceeded = Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData);
- if (MetadataSucceeded)
- {
- UploadStats.BlocksBytes += BlockMetaData.GetSize();
- NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
- UploadBlockMetadataCount++;
- }
- else
- {
- FailedMetadataUploadCount++;
- }
- }
- }
+ UploadOp.m_UploadStats.ChunkCount.load(),
+ NiceBytes(UploadOp.m_UploadStats.ChunksBytes.load()),
+ MultipartAttachmentStats);
}
- if (UploadBlockMetadataCount > 0)
- {
- uint64_t ElapsedUS = UploadBlockMetadataTimer.GetElapsedTimeUs();
- UploadStats.ElapsedWallTimeUS += ElapsedUS;
- if (!IsQuiet)
- {
- ZEN_CONSOLE("Uploaded metadata for {} blocks in {}", UploadBlockMetadataCount, NiceTimeSpanMs(ElapsedUS / 1000));
- }
- }
- }
-
- ValidateStatistics ValidateStats;
- DownloadStatistics ValidateDownloadStats;
- if (PostUploadVerify && !AbortFlag)
- {
- ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::Validate, TaskSteps::StepCount);
- ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, BuildPartName, ValidateStats, ValidateDownloadStats);
- }
-
- ZEN_CONSOLE_VERBOSE(
- "Folder scanning stats:"
- "\n FoundFileCount: {}"
- "\n FoundFileByteCount: {}"
- "\n AcceptedFileCount: {}"
- "\n AcceptedFileByteCount: {}"
- "\n ElapsedWallTimeUS: {}",
- LocalFolderScanStats.FoundFileCount.load(),
- NiceBytes(LocalFolderScanStats.FoundFileByteCount.load()),
- LocalFolderScanStats.AcceptedFileCount.load(),
- NiceBytes(LocalFolderScanStats.AcceptedFileByteCount.load()),
- NiceLatencyNs(LocalFolderScanStats.ElapsedWallTimeUS * 1000));
-
- ZEN_CONSOLE_VERBOSE(
- "Chunking stats:"
- "\n FilesProcessed: {}"
- "\n FilesChunked: {}"
- "\n BytesHashed: {}"
- "\n UniqueChunksFound: {}"
- "\n UniqueSequencesFound: {}"
- "\n UniqueBytesFound: {}"
- "\n ElapsedWallTimeUS: {}",
- ChunkingStats.FilesProcessed.load(),
- ChunkingStats.FilesChunked.load(),
- NiceBytes(ChunkingStats.BytesHashed.load()),
- ChunkingStats.UniqueChunksFound.load(),
- ChunkingStats.UniqueSequencesFound.load(),
- NiceBytes(ChunkingStats.UniqueBytesFound.load()),
- NiceLatencyNs(ChunkingStats.ElapsedWallTimeUS * 1000));
-
- ZEN_CONSOLE_VERBOSE(
- "Find block stats:"
- "\n FindBlockTimeMS: {}"
- "\n PotentialChunkCount: {}"
- "\n PotentialChunkByteCount: {}"
- "\n FoundBlockCount: {}"
- "\n FoundBlockChunkCount: {}"
- "\n FoundBlockByteCount: {}"
- "\n AcceptedBlockCount: {}"
- "\n AcceptedChunkCount: {}"
- "\n AcceptedByteCount: {}"
- "\n AcceptedRawByteCount: {}"
- "\n RejectedBlockCount: {}"
- "\n RejectedChunkCount: {}"
- "\n RejectedByteCount: {}"
- "\n AcceptedReduntantChunkCount: {}"
- "\n AcceptedReduntantByteCount: {}"
- "\n NewBlocksCount: {}"
- "\n NewBlocksChunkCount: {}"
- "\n NewBlocksChunkByteCount: {}",
- NiceTimeSpanMs(FindBlocksStats.FindBlockTimeMS),
- FindBlocksStats.PotentialChunkCount,
- NiceBytes(FindBlocksStats.PotentialChunkByteCount),
- FindBlocksStats.FoundBlockCount,
- FindBlocksStats.FoundBlockChunkCount,
- NiceBytes(FindBlocksStats.FoundBlockByteCount),
- FindBlocksStats.AcceptedBlockCount,
- FindBlocksStats.AcceptedChunkCount,
- NiceBytes(FindBlocksStats.AcceptedByteCount),
- NiceBytes(FindBlocksStats.AcceptedRawByteCount),
- FindBlocksStats.RejectedBlockCount,
- FindBlocksStats.RejectedChunkCount,
- NiceBytes(FindBlocksStats.RejectedByteCount),
- FindBlocksStats.AcceptedReduntantChunkCount,
- NiceBytes(FindBlocksStats.AcceptedReduntantByteCount),
- FindBlocksStats.NewBlocksCount,
- FindBlocksStats.NewBlocksChunkCount,
- NiceBytes(FindBlocksStats.NewBlocksChunkByteCount));
-
- ZEN_CONSOLE_VERBOSE(
- "Generate blocks stats:"
- "\n GeneratedBlockByteCount: {}"
- "\n GeneratedBlockCount: {}"
- "\n GenerateBlocksElapsedWallTimeUS: {}",
- NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount.load()),
- GenerateBlocksStats.GeneratedBlockCount.load(),
- NiceLatencyNs(GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS * 1000));
-
- ZEN_CONSOLE_VERBOSE(
- "Generate blocks stats:"
- "\n ChunkCount: {}"
- "\n ChunkByteCount: {}"
- "\n CompressedChunkCount: {}"
- "\n CompressChunksElapsedWallTimeUS: {}",
- LooseChunksStats.ChunkCount,
- NiceBytes(LooseChunksStats.ChunkByteCount),
- LooseChunksStats.CompressedChunkCount.load(),
- NiceBytes(LooseChunksStats.CompressedChunkBytes.load()),
- NiceLatencyNs(LooseChunksStats.CompressChunksElapsedWallTimeUS * 1000));
-
- ZEN_CONSOLE_VERBOSE(
- "Disk stats:"
- "\n OpenReadCount: {}"
- "\n OpenWriteCount: {}"
- "\n ReadCount: {}"
- "\n ReadByteCount: {}"
- "\n WriteCount: {} ({} cloned)"
- "\n WriteByteCount: {} ({} cloned)"
- "\n CurrentOpenFileCount: {}",
- DiskStats.OpenReadCount.load(),
- DiskStats.OpenWriteCount.load(),
- DiskStats.ReadCount.load(),
- NiceBytes(DiskStats.ReadByteCount.load()),
- DiskStats.WriteCount.load(),
- DiskStats.CloneCount.load(),
- NiceBytes(DiskStats.WriteByteCount.load()),
- NiceBytes(DiskStats.CloneByteCount.load()),
- DiskStats.CurrentOpenFileCount.load());
-
- ZEN_CONSOLE_VERBOSE(
- "Upload stats:"
- "\n BlockCount: {}"
- "\n BlocksBytes: {}"
- "\n ChunkCount: {}"
- "\n ChunksBytes: {}"
- "\n ReadFromDiskBytes: {}"
- "\n MultipartAttachmentCount: {}"
- "\n ElapsedWallTimeUS: {}",
- UploadStats.BlockCount.load(),
- NiceBytes(UploadStats.BlocksBytes.load()),
- UploadStats.ChunkCount.load(),
- NiceBytes(UploadStats.ChunksBytes.load()),
- NiceBytes(UploadStats.ReadFromDiskBytes.load()),
- UploadStats.MultipartAttachmentCount.load(),
- NiceLatencyNs(UploadStats.ElapsedWallTimeUS * 1000));
-
- if (PostUploadVerify)
- {
- ZEN_CONSOLE_VERBOSE(
- "Validate stats:"
- "\n BuildBlobSize: {}"
- "\n BuildPartSize: {}"
- "\n ChunkAttachmentCount: {}"
- "\n BlockAttachmentCount: {}"
- "\n VerifiedAttachmentCount: {}"
- "\n VerifiedByteCount: {}"
- "\n ElapsedWallTimeUS: {}",
- NiceBytes(ValidateStats.BuildBlobSize),
- NiceBytes(ValidateStats.BuildPartSize),
- ValidateStats.ChunkAttachmentCount,
- ValidateStats.BlockAttachmentCount,
- ValidateStats.VerifiedAttachmentCount.load(),
- NiceBytes(ValidateStats.VerifiedByteCount.load()),
- NiceLatencyNs(ValidateStats.ElapsedWallTimeUS * 1000));
-
- ZEN_CONSOLE_VERBOSE(
- "Validate download stats:"
- "\n RequestsCompleteCount: {}"
- "\n DownloadedChunkCount: {}"
- "\n DownloadedChunkByteCount: {}"
- "\n MultipartAttachmentCount: {}"
- "\n DownloadedBlockCount: {}"
- "\n DownloadedBlockByteCount: {}"
- "\n DownloadedPartialBlockCount: {}"
- "\n DownloadedPartialBlockByteCount: {}",
- ValidateDownloadStats.RequestsCompleteCount.load(),
- ValidateDownloadStats.DownloadedChunkCount.load(),
- NiceBytes(ValidateDownloadStats.DownloadedChunkByteCount.load()),
- ValidateDownloadStats.MultipartAttachmentCount.load(),
- ValidateDownloadStats.DownloadedBlockCount.load(),
- NiceBytes(ValidateDownloadStats.DownloadedBlockByteCount.load()),
- ValidateDownloadStats.DownloadedPartialBlockCount.load(),
- NiceBytes(ValidateDownloadStats.DownloadedPartialBlockByteCount.load()));
- }
-
- const double DeltaByteCountPercent =
- ChunkingStats.BytesHashed > 0
- ? (100.0 * (FindBlocksStats.NewBlocksChunkByteCount + LooseChunksStats.CompressedChunkBytes)) / (ChunkingStats.BytesHashed)
- : 0.0;
-
- const std::string MultipartAttachmentStats =
- (LargeAttachmentSize != (uint64_t)-1) ? fmt::format(" ({} as multipart)", UploadStats.MultipartAttachmentCount.load()) : "";
-
- std::string ValidateInfo;
- if (PostUploadVerify)
- {
- const uint64_t DownloadedCount = ValidateDownloadStats.DownloadedChunkCount + ValidateDownloadStats.DownloadedBlockCount;
- const uint64_t DownloadedByteCount =
- ValidateDownloadStats.DownloadedChunkByteCount + ValidateDownloadStats.DownloadedBlockByteCount;
- ValidateInfo = fmt::format("\n Verified: {:>8} ({}), {}B/sec, {}",
- DownloadedCount,
- NiceBytes(DownloadedByteCount),
- NiceNum(GetBytesPerSecond(ValidateStats.ElapsedWallTimeUS, DownloadedByteCount)),
- NiceTimeSpanMs(ValidateStats.ElapsedWallTimeUS / 1000));
}
-
- if (!IsQuiet)
- {
- ZEN_CONSOLE(
- "Uploaded part {} ('{}') to build {}, {}\n"
- " Scanned files: {:>8} ({}), {}B/sec, {}\n"
- " New data: {:>8} ({}) {:.1f}%\n"
- " New blocks: {:>8} ({} -> {}), {}B/sec, {}\n"
- " New chunks: {:>8} ({} -> {}), {}B/sec, {}\n"
- " Uploaded: {:>8} ({}), {}bits/sec, {}\n"
- " Blocks: {:>8} ({})\n"
- " Chunks: {:>8} ({}){}"
- "{}",
- BuildPartId,
- BuildPartName,
- BuildId,
- NiceTimeSpanMs(ProcessTimer.GetElapsedTimeMs()),
-
- LocalFolderScanStats.FoundFileCount.load(),
- NiceBytes(LocalFolderScanStats.FoundFileByteCount.load()),
- NiceNum(GetBytesPerSecond(ChunkingStats.ElapsedWallTimeUS, ChunkingStats.BytesHashed)),
- NiceTimeSpanMs(ChunkingStats.ElapsedWallTimeUS / 1000),
-
- FindBlocksStats.NewBlocksChunkCount + LooseChunksStats.CompressedChunkCount,
- NiceBytes(FindBlocksStats.NewBlocksChunkByteCount + LooseChunksStats.CompressedChunkBytes),
- DeltaByteCountPercent,
-
- GenerateBlocksStats.GeneratedBlockCount.load(),
- NiceBytes(FindBlocksStats.NewBlocksChunkByteCount),
- NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount.load()),
- NiceNum(
- GetBytesPerSecond(GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS, GenerateBlocksStats.GeneratedBlockByteCount)),
- NiceTimeSpanMs(GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS / 1000),
-
- LooseChunksStats.CompressedChunkCount.load(),
- NiceBytes(LooseChunksStats.ChunkByteCount),
- NiceBytes(LooseChunksStats.CompressedChunkBytes.load()),
- NiceNum(GetBytesPerSecond(LooseChunksStats.CompressChunksElapsedWallTimeUS, LooseChunksStats.ChunkByteCount)),
- NiceTimeSpanMs(LooseChunksStats.CompressChunksElapsedWallTimeUS / 1000),
-
- UploadStats.BlockCount.load() + UploadStats.ChunkCount.load(),
- NiceBytes(UploadStats.BlocksBytes + UploadStats.ChunksBytes),
- NiceNum(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, (UploadStats.ChunksBytes + UploadStats.BlocksBytes) * 8)),
- NiceTimeSpanMs(UploadStats.ElapsedWallTimeUS / 1000),
-
- UploadStats.BlockCount.load(),
- NiceBytes(UploadStats.BlocksBytes.load()),
-
- UploadStats.ChunkCount.load(),
- NiceBytes(UploadStats.ChunksBytes.load()),
- MultipartAttachmentStats,
-
- ValidateInfo);
- }
-
- Storage.BuildStorage->PutBuildPartStats(
- BuildId,
- BuildPartId,
- {{"totalSize", double(LocalFolderScanStats.FoundFileByteCount.load())},
- {"reusedRatio", AcceptedByteCountPercent / 100.0},
- {"reusedBlockCount", double(FindBlocksStats.AcceptedBlockCount)},
- {"reusedBlockByteCount", double(FindBlocksStats.AcceptedRawByteCount)},
- {"newBlockCount", double(FindBlocksStats.NewBlocksCount)},
- {"newBlockByteCount", double(FindBlocksStats.NewBlocksChunkByteCount)},
- {"uploadedCount", double(UploadStats.BlockCount.load() + UploadStats.ChunkCount.load())},
- {"uploadedByteCount", double(UploadStats.BlocksBytes.load() + UploadStats.ChunksBytes.load())},
- {"uploadedBytesPerSec",
- double(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, UploadStats.ChunksBytes + UploadStats.BlocksBytes))},
- {"elapsedTimeSec", double(ProcessTimer.GetElapsedTimeMs() / 1000.0)}});
-
- ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::Cleanup, TaskSteps::StepCount);
}
+ struct VerifyFolderStatistics
+ {
+ std::atomic<uint64_t> FilesVerified = 0;
+ std::atomic<uint64_t> FilesFailed = 0;
+ std::atomic<uint64_t> ReadBytes = 0;
+ uint64_t VerifyElapsedWallTimeUs = 0;
+ };
+
void VerifyFolder(const ChunkedFolderContent& Content,
const std::filesystem::path& Path,
bool VerifyFileHash,
@@ -5634,54 +3235,45 @@ namespace {
}
FolderContent LocalFolderState;
- DiskStatistics DiskStats;
- CacheMappingStatistics CacheMappingStats;
- DownloadStatistics DownloadStats;
- WriteChunkStatistics WriteChunkStats;
- RebuildFolderStateStatistics RebuildFolderStateStats;
- VerifyFolderStatistics VerifyFolderStats;
-
const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalContent);
const ChunkedContentLookup RemoteLookup = BuildChunkedContentLookup(RemoteContent);
ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::Download, TaskSteps::StepCount);
- {
- ConsoleOpLogOutput Output(ProgressMode);
- BuildsOperationUpdateFolder Updater(
- Output,
- Storage,
- AbortFlag,
- PauseFlag,
- GetIOWorkerPool(),
- GetNetworkPool(),
- BuildId,
- Path,
- LocalContent,
- LocalLookup,
- RemoteContent,
- RemoteLookup,
- BlockDescriptions,
- LooseChunkHashes,
- BuildsOperationUpdateFolder::Options{.IsQuiet = IsQuiet,
- .IsVerbose = IsVerbose,
- .AllowFileClone = AllowFileClone,
- .UseSparseFiles = UseSparseFiles,
- .SystemRootDir = Options.SystemRootDir,
- .ZenFolderPath = Options.ZenFolderPath,
- .LargeAttachmentSize = LargeAttachmentSize,
- .PreferredMultipartChunkSize = PreferredMultipartChunkSize,
- .PartialBlockRequestMode = Options.PartialBlockRequestMode,
- .WipeTargetFolder = Options.CleanTargetFolder,
- .PrimeCacheOnly = Options.PrimeCacheOnly,
- .EnableOtherDownloadsScavenging = Options.EnableOtherDownloadsScavenging,
- .EnableTargetFolderScavenging = Options.EnableTargetFolderScavenging,
- .DefaultExcludeFolders = DefaultExcludeFolders,
- .DefaultExcludeExtensions = DefaultExcludeExtensions},
- DiskStats);
- Updater.Execute(LocalFolderState);
- }
-
+ ConsoleOpLogOutput Output(ProgressMode);
+ BuildsOperationUpdateFolder Updater(
+ Output,
+ Storage,
+ AbortFlag,
+ PauseFlag,
+ GetIOWorkerPool(),
+ GetNetworkPool(),
+ BuildId,
+ Path,
+ LocalContent,
+ LocalLookup,
+ RemoteContent,
+ RemoteLookup,
+ BlockDescriptions,
+ LooseChunkHashes,
+ BuildsOperationUpdateFolder::Options{.IsQuiet = IsQuiet,
+ .IsVerbose = IsVerbose,
+ .AllowFileClone = AllowFileClone,
+ .UseSparseFiles = UseSparseFiles,
+ .SystemRootDir = Options.SystemRootDir,
+ .ZenFolderPath = Options.ZenFolderPath,
+ .LargeAttachmentSize = LargeAttachmentSize,
+ .PreferredMultipartChunkSize = PreferredMultipartChunkSize,
+ .PartialBlockRequestMode = Options.PartialBlockRequestMode,
+ .WipeTargetFolder = Options.CleanTargetFolder,
+ .PrimeCacheOnly = Options.PrimeCacheOnly,
+ .EnableOtherDownloadsScavenging = Options.EnableOtherDownloadsScavenging,
+ .EnableTargetFolderScavenging = Options.EnableTargetFolderScavenging,
+ .ExcludeFolders = DefaultExcludeFolders,
+ .ExcludeExtensions = DefaultExcludeExtensions});
+ Updater.Execute(LocalFolderState);
+
+ VerifyFolderStatistics VerifyFolderStats;
if (!AbortFlag)
{
if (!Options.PrimeCacheOnly)
@@ -5708,11 +3300,12 @@ namespace {
WriteFile(ZenStateFileJsonPath(Options.ZenFolderPath), IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size()));
#endif // 0
}
- const uint64_t DownloadCount = DownloadStats.DownloadedChunkCount.load() + DownloadStats.DownloadedBlockCount.load() +
- DownloadStats.DownloadedPartialBlockCount.load();
- const uint64_t DownloadByteCount = DownloadStats.DownloadedChunkByteCount.load() +
- DownloadStats.DownloadedBlockByteCount.load() +
- DownloadStats.DownloadedPartialBlockByteCount.load();
+ const uint64_t DownloadCount = Updater.m_DownloadStats.DownloadedChunkCount.load() +
+ Updater.m_DownloadStats.DownloadedBlockCount.load() +
+ Updater.m_DownloadStats.DownloadedPartialBlockCount.load();
+ const uint64_t DownloadByteCount = Updater.m_DownloadStats.DownloadedChunkByteCount.load() +
+ Updater.m_DownloadStats.DownloadedBlockByteCount.load() +
+ Updater.m_DownloadStats.DownloadedPartialBlockByteCount.load();
const uint64_t DownloadTimeMs = DownloadTimer.GetElapsedTimeMs();
if (!IsQuiet)
@@ -5730,15 +3323,15 @@ namespace {
DownloadCount,
NiceBytes(DownloadByteCount),
- NiceNum(GetBytesPerSecond(WriteChunkStats.DownloadTimeUs, DownloadByteCount * 8)),
+ NiceNum(GetBytesPerSecond(Updater.m_WriteChunkStats.DownloadTimeUs, DownloadByteCount * 8)),
- DiskStats.WriteCount.load(),
- NiceBytes(DiskStats.WriteByteCount.load()),
- NiceNum(GetBytesPerSecond(WriteChunkStats.WriteTimeUs, DiskStats.WriteByteCount.load())),
+ Updater.m_DiskStats.WriteCount.load(),
+ NiceBytes(Updater.m_DiskStats.WriteByteCount.load()),
+ NiceNum(GetBytesPerSecond(Updater.m_WriteChunkStats.WriteTimeUs, Updater.m_DiskStats.WriteByteCount.load())),
- NiceTimeSpanMs(RebuildFolderStateStats.CleanFolderElapsedWallTimeUs / 1000),
+ NiceTimeSpanMs(Updater.m_RebuildFolderStateStats.CleanFolderElapsedWallTimeUs / 1000),
- NiceTimeSpanMs(RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs / 1000),
+ NiceTimeSpanMs(Updater.m_RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs / 1000),
NiceTimeSpanMs(VerifyFolderStats.VerifyElapsedWallTimeUs / 1000));
}
@@ -7377,8 +4970,30 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_AllowMultiparts,
MetaData,
m_CreateBuild,
- m_Clean,
- m_PostUploadVerify);
+ m_Clean);
+
+ if (!AbortFlag)
+ {
+ if (m_PostUploadVerify)
+ {
+ ValidateStatistics ValidateStats;
+ DownloadStatistics ValidateDownloadStats;
+ ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, ValidateDownloadStats);
+ std::string ValidateInfo;
+ if (m_PostUploadVerify)
+ {
+ const uint64_t DownloadedCount =
+ ValidateDownloadStats.DownloadedChunkCount + ValidateDownloadStats.DownloadedBlockCount;
+ const uint64_t DownloadedByteCount =
+ ValidateDownloadStats.DownloadedChunkByteCount + ValidateDownloadStats.DownloadedBlockByteCount;
+ ValidateInfo = fmt::format("Verified: {:>8} ({}), {}B/sec, {}",
+ DownloadedCount,
+ NiceBytes(DownloadedByteCount),
+ NiceNum(GetBytesPerSecond(ValidateStats.ElapsedWallTimeUS, DownloadedByteCount)),
+ NiceTimeSpanMs(ValidateStats.ElapsedWallTimeUS / 1000));
+ }
+ }
+ }
if (true)
{
@@ -7864,13 +5479,18 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_AllowMultiparts,
MetaData,
true,
- false,
- true);
+ false);
if (AbortFlag)
{
throw std::runtime_error("Test aborted. (Upload build)");
}
+ {
+ ValidateStatistics ValidateStats;
+ DownloadStatistics ValidateDownloadStats;
+ ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, ValidateDownloadStats);
+ }
+
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
DownloadFolder(Storage,
BuildId,
@@ -8050,13 +5670,18 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_AllowMultiparts,
MetaData2,
true,
- false,
- true);
+ false);
if (AbortFlag)
{
throw std::runtime_error("Test aborted. (Upload scrambled)");
}
+ {
+ ValidateStatistics ValidateStats;
+ DownloadStatistics ValidateDownloadStats;
+ ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, ValidateDownloadStats);
+ }
+
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
DownloadFolder(Storage,
BuildId,
diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp
index 8c5040397..c7afe702c 100644
--- a/src/zencore/filesystem.cpp
+++ b/src/zencore/filesystem.cpp
@@ -3063,6 +3063,33 @@ SetFileReadOnly(const std::filesystem::path& Filename, bool ReadOnly)
return Result;
}
+void
+MakeSafeAbsolutePathÍnPlace(std::filesystem::path& Path)
+{
+ if (!Path.empty())
+ {
+ std::filesystem::path AbsolutePath = std::filesystem::absolute(Path).make_preferred();
+#if ZEN_PLATFORM_WINDOWS
+ const std::string_view Prefix = "\\\\?\\";
+ const std::u8string PrefixU8(Prefix.begin(), Prefix.end());
+ std::u8string PathString = AbsolutePath.u8string();
+ if (!PathString.empty() && !PathString.starts_with(PrefixU8))
+ {
+ PathString.insert(0, PrefixU8);
+ Path = PathString;
+ }
+#endif // ZEN_PLATFORM_WINDOWS
+ }
+}
+
+std::filesystem::path
+MakeSafeAbsolutePath(const std::filesystem::path& Path)
+{
+ std::filesystem::path Tmp(Path);
+ MakeSafeAbsolutePathÍnPlace(Tmp);
+ return Tmp;
+}
+
class SharedMemoryImpl : public SharedMemory
{
public:
diff --git a/src/zencore/include/zencore/filesystem.h b/src/zencore/include/zencore/filesystem.h
index 3bfc3b540..b4906aebf 100644
--- a/src/zencore/include/zencore/filesystem.h
+++ b/src/zencore/include/zencore/filesystem.h
@@ -408,6 +408,9 @@ uint32_t MakeFileModeReadOnly(uint32_t FileMode, bool ReadOnly);
bool SetFileReadOnly(const std::filesystem::path& Filename, bool ReadOnly, std::error_code& Ec);
bool SetFileReadOnly(const std::filesystem::path& Filename, bool ReadOnly);
+void MakeSafeAbsolutePathÍnPlace(std::filesystem::path& Path);
+[[nodiscard]] std::filesystem::path MakeSafeAbsolutePath(const std::filesystem::path& Path);
+
class SharedMemory
{
public:
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp
index f3a585737..a4f4237cd 100644
--- a/src/zenremotestore/builds/buildstorageoperations.cpp
+++ b/src/zenremotestore/builds/buildstorageoperations.cpp
@@ -5,6 +5,7 @@
#include <zenremotestore/builds/buildstorage.h>
#include <zenremotestore/builds/buildstoragecache.h>
#include <zenremotestore/chunking/chunkblock.h>
+#include <zenremotestore/chunking/chunkingcontroller.h>
#include <zencore/basicfile.h>
#include <zencore/compactbinary.h>
@@ -25,6 +26,8 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_set.h>
ZEN_THIRD_PARTY_INCLUDES_END
+#define EXTRA_VERIFY 0
+
namespace zen {
using namespace std::literals;
@@ -496,8 +499,118 @@ namespace {
}
}
+ const std::vector<uint32_t> NonCompressableExtensions({HashStringDjb2(".mp4"sv),
+ HashStringDjb2(".zip"sv),
+ HashStringDjb2(".7z"sv),
+ HashStringDjb2(".bzip"sv),
+ HashStringDjb2(".rar"sv),
+ HashStringDjb2(".gzip"sv),
+ HashStringDjb2(".apk"sv),
+ HashStringDjb2(".nsp"sv),
+ HashStringDjb2(".xvc"sv),
+ HashStringDjb2(".pkg"sv),
+ HashStringDjb2(".dmg"sv),
+ HashStringDjb2(".ipa"sv)});
+
+ const tsl::robin_set<uint32_t> NonCompressableExtensionSet(NonCompressableExtensions.begin(), NonCompressableExtensions.end());
+
+ bool IsExtensionHashCompressable(const uint32_t PathHash) { return !NonCompressableExtensionSet.contains(PathHash); }
+
+ bool IsChunkCompressable(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, uint32_t ChunkIndex)
+ {
+ ZEN_UNUSED(Content);
+ const uint32_t ChunkLocationCount = Lookup.ChunkSequenceLocationCounts[ChunkIndex];
+ if (ChunkLocationCount == 0)
+ {
+ return false;
+ }
+ const size_t ChunkLocationOffset = Lookup.ChunkSequenceLocationOffset[ChunkIndex];
+ const uint32_t SequenceIndex = Lookup.ChunkSequenceLocations[ChunkLocationOffset].SequenceIndex;
+ const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex];
+ const uint32_t ExtensionHash = Lookup.PathExtensionHash[PathIndex];
+
+ const bool IsCompressable = IsExtensionHashCompressable(ExtensionHash);
+ return IsCompressable;
+ }
+
+ template<typename T>
+ std::string FormatArray(std::span<const T> Items, std::string_view Prefix)
+ {
+ ExtendableStringBuilder<512> SB;
+ for (const T& Item : Items)
+ {
+ SB.Append(fmt::format("{}{}", Prefix, Item));
+ }
+ return SB.ToString();
+ }
+
} // namespace
+class ReadFileCache
+{
+public:
+ // A buffered file reader that provides CompositeBuffer where the buffers are owned and the memory never overwritten
+ ReadFileCache(DiskStatistics& DiskStats,
+ const std::filesystem::path& Path,
+ const ChunkedFolderContent& LocalContent,
+ const ChunkedContentLookup& LocalLookup,
+ size_t MaxOpenFileCount)
+ : m_Path(Path)
+ , m_LocalContent(LocalContent)
+ , m_LocalLookup(LocalLookup)
+ , m_DiskStats(DiskStats)
+ {
+ m_OpenFiles.reserve(MaxOpenFileCount);
+ }
+ ~ReadFileCache() { m_OpenFiles.clear(); }
+
+ CompositeBuffer GetRange(uint32_t SequenceIndex, uint64_t Offset, uint64_t Size)
+ {
+ ZEN_TRACE_CPU("ReadFileCache::GetRange");
+
+ auto CacheIt =
+ std::find_if(m_OpenFiles.begin(), m_OpenFiles.end(), [SequenceIndex](const auto& Lhs) { return Lhs.first == SequenceIndex; });
+ if (CacheIt != m_OpenFiles.end())
+ {
+ if (CacheIt != m_OpenFiles.begin())
+ {
+ auto CachedFile(std::move(CacheIt->second));
+ m_OpenFiles.erase(CacheIt);
+ m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(SequenceIndex, std::move(CachedFile)));
+ }
+ CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size);
+ return Result;
+ }
+ const uint32_t LocalPathIndex = m_LocalLookup.SequenceIndexFirstPathIndex[SequenceIndex];
+ const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred();
+ if (Size == m_LocalContent.RawSizes[LocalPathIndex])
+ {
+ IoBuffer Result = IoBufferBuilder::MakeFromFile(LocalFilePath);
+ return CompositeBuffer(SharedBuffer(Result));
+ }
+ if (m_OpenFiles.size() == m_OpenFiles.capacity())
+ {
+ m_OpenFiles.pop_back();
+ }
+ m_OpenFiles.insert(m_OpenFiles.begin(),
+ std::make_pair(SequenceIndex,
+ std::make_unique<BufferedOpenFile>(LocalFilePath,
+ m_DiskStats.OpenReadCount,
+ m_DiskStats.CurrentOpenFileCount,
+ m_DiskStats.ReadCount,
+ m_DiskStats.ReadByteCount)));
+ CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size);
+ return Result;
+ }
+
+private:
+ const std::filesystem::path m_Path;
+ const ChunkedFolderContent& m_LocalContent;
+ const ChunkedContentLookup& m_LocalLookup;
+ std::vector<std::pair<uint32_t, std::unique_ptr<BufferedOpenFile>>> m_OpenFiles;
+ DiskStatistics& m_DiskStats;
+};
+
bool
ReadStateObject(BuildOpLogOutput& LogOutput,
CbObjectView StateView,
@@ -1118,6 +1231,8 @@ ZenTempFolderPath(const std::filesystem::path& ZenFolderPath)
return ZenFolderPath / "tmp";
}
+////////////////////// BuildsOperationUpdateFolder
+
BuildsOperationUpdateFolder::BuildsOperationUpdateFolder(BuildOpLogOutput& LogOutput,
StorageInstance& Storage,
std::atomic<bool>& AbortFlag,
@@ -1132,8 +1247,7 @@ BuildsOperationUpdateFolder::BuildsOperationUpdateFolder(BuildOpLogOutput&
const ChunkedContentLookup& RemoteLookup,
const std::vector<ChunkBlockDescription>& BlockDescriptions,
const std::vector<IoHash>& LooseChunkHashes,
- const Options& Options,
- DiskStatistics& DiskStats)
+ const Options& Options)
: m_LogOutput(LogOutput)
, m_Storage(Storage)
, m_AbortFlag(AbortFlag)
@@ -1149,7 +1263,6 @@ BuildsOperationUpdateFolder::BuildsOperationUpdateFolder(BuildOpLogOutput&
, m_BlockDescriptions(BlockDescriptions)
, m_LooseChunkHashes(LooseChunkHashes)
, m_Options(Options)
-, m_DiskStats(DiskStats)
, m_CacheFolderPath(ZenTempCacheFolderPath(m_Options.ZenFolderPath))
{
}
@@ -1159,9 +1272,22 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
{
ZEN_TRACE_CPU("BuildsOperationUpdateFolder::Execute");
+ enum TaskSteps : uint32_t
+ {
+ ScanExistingData,
+ WriteChunks,
+ PrepareTarget,
+ FinalizeTarget,
+ StepCount
+ };
+
+ auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress(TaskSteps::StepCount, TaskSteps::StepCount); });
+
ZEN_ASSERT((!m_Options.PrimeCacheOnly) ||
(m_Options.PrimeCacheOnly && (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off)));
+ m_LogOutput.SetLogOperationProgress(TaskSteps::ScanExistingData, TaskSteps::StepCount);
+
Stopwatch IndexTimer;
if (!m_Options.IsQuiet)
@@ -1604,6 +1730,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
{
ZEN_TRACE_CPU("WriteChunks");
+ m_LogOutput.SetLogOperationProgress(TaskSteps::WriteChunks, TaskSteps::StepCount);
+
Stopwatch WriteTimer;
FilteredRate FilteredDownloadedBytesPerSecond;
@@ -2853,6 +2981,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
return;
}
+ m_LogOutput.SetLogOperationProgress(TaskSteps::PrepareTarget, TaskSteps::StepCount);
+
tsl::robin_map<uint32_t, uint32_t> RemotePathIndexToLocalPathIndex;
RemotePathIndexToLocalPathIndex.reserve(m_RemoteContent.Paths.size());
@@ -3031,13 +3161,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
Stopwatch Timer;
// Clean target folder
- if (!CleanDirectory(m_IOWorkerPool,
- m_AbortFlag,
- m_PauseFlag,
- m_LogOutput,
- m_Options.IsQuiet,
- m_Path,
- m_Options.DefaultExcludeFolders))
+ if (!CleanDirectory(m_IOWorkerPool, m_AbortFlag, m_PauseFlag, m_LogOutput, m_Options.IsQuiet, m_Path, m_Options.ExcludeFolders))
{
LOG_OUTPUT_WARN(m_LogOutput, "Some files in {} could not be removed", m_Path);
}
@@ -3051,6 +3175,9 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
{
ZEN_TRACE_CPU("FinalizeTree");
+
+ m_LogOutput.SetLogOperationProgress(TaskSteps::FinalizeTarget, TaskSteps::StepCount);
+
Stopwatch Timer;
std::unique_ptr<BuildOpLogOutput::ProgressBar> RebuildProgressBarPtr(m_LogOutput.CreateProgressBar("Rebuild State"));
@@ -3688,9 +3815,9 @@ BuildsOperationUpdateFolder::FindDownloadedChunk(const IoHash& ChunkHash)
}
FolderContent
-BuildsOperationUpdateFolder::GetValidFolderContent(GetFolderContentStatistics& LocalFolderScanStats,
- const std::filesystem::path& Path,
- std::span<const std::filesystem::path> PathsToCheck,
+BuildsOperationUpdateFolder::GetValidFolderContent(GetFolderContentStatistics& FolderScanStats,
+ const std::filesystem::path& Path,
+ std::span<const std::filesystem::path> PathsToCheck,
std::function<void(uint64_t PathCount, uint64_t CompletedPathCount)>&& ProgressCallback)
{
ZEN_TRACE_CPU("GetValidFolderContent");
@@ -3705,7 +3832,7 @@ BuildsOperationUpdateFolder::GetValidFolderContent(GetFolderContentStatistics&
{
Stopwatch Timer;
- auto _ = MakeGuard([&LocalFolderScanStats, &Timer]() { LocalFolderScanStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); });
+ auto _ = MakeGuard([&FolderScanStats, &Timer]() { FolderScanStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); });
ParallelWork Work(m_AbortFlag,
m_PauseFlag,
@@ -3717,7 +3844,7 @@ BuildsOperationUpdateFolder::GetValidFolderContent(GetFolderContentStatistics&
{
uint32_t PathRangeCount = Min(128u, PathCount - PathIndex);
Work.ScheduleWork(m_IOWorkerPool,
- [PathIndex, PathRangeCount, &PathsToCheck, &Path, &Result, &CompletedPathCount, &LocalFolderScanStats](
+ [PathIndex, PathRangeCount, &PathsToCheck, &Path, &Result, &CompletedPathCount, &FolderScanStats](
std::atomic<bool>& AbortFlag) {
if (!AbortFlag)
{
@@ -3734,10 +3861,10 @@ BuildsOperationUpdateFolder::GetValidFolderContent(GetFolderContentStatistics&
Result.Attributes[PathRangeIndex]))
{
Result.Paths[PathRangeIndex] = std::move(FilePath);
- LocalFolderScanStats.FoundFileCount++;
- LocalFolderScanStats.FoundFileByteCount += Result.RawSizes[PathRangeIndex];
- LocalFolderScanStats.AcceptedFileCount++;
- LocalFolderScanStats.AcceptedFileByteCount += Result.RawSizes[PathRangeIndex];
+ FolderScanStats.FoundFileCount++;
+ FolderScanStats.FoundFileByteCount += Result.RawSizes[PathRangeIndex];
+ FolderScanStats.AcceptedFileCount++;
+ FolderScanStats.AcceptedFileByteCount += Result.RawSizes[PathRangeIndex];
}
CompletedPathCount++;
}
@@ -4669,6 +4796,884 @@ BuildsOperationUpdateFolder::VerifySequence(uint32_t RemoteSequenceIndex)
}
}
+////////////////////// BuildsOperationUploadFolder
+
+BuildsOperationUploadFolder::BuildsOperationUploadFolder(BuildOpLogOutput& LogOutput,
+ StorageInstance& Storage,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ WorkerThreadPool& IOWorkerPool,
+ WorkerThreadPool& NetworkPool,
+ const Oid& BuildId,
+ const Oid& BuildPartId,
+ const std::string_view BuildPartName,
+ const std::filesystem::path& Path,
+ const std::filesystem::path& ManifestPath,
+ bool CreateBuild,
+ const CbObject& MetaData,
+ const Options& Options)
+: m_LogOutput(LogOutput)
+, m_Storage(Storage)
+, m_AbortFlag(AbortFlag)
+, m_PauseFlag(PauseFlag)
+, m_IOWorkerPool(IOWorkerPool)
+, m_NetworkPool(NetworkPool)
+, m_BuildId(BuildId)
+, m_BuildPartId(BuildPartId)
+, m_BuildPartName(BuildPartName)
+, m_Path(Path)
+, m_ManifestPath(ManifestPath)
+, m_CreateBuild(CreateBuild)
+, m_MetaData(MetaData)
+, m_Options(Options)
+{
+}
+
+void
+BuildsOperationUploadFolder::Execute()
+{
+ enum TaskSteps : uint32_t
+ {
+ PrepareBuild,
+ CalculateDelta,
+ Upload,
+ // Validate,
+ Cleanup,
+ StepCount
+ };
+
+ auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress(TaskSteps::StepCount, TaskSteps::StepCount); });
+
+ Stopwatch ProcessTimer;
+
+ CreateDirectories(m_Options.TempDir);
+ CleanDirectory(m_Options.TempDir, {});
+ auto _ = MakeGuard([&]() {
+ if (CleanDirectory(m_Options.TempDir, {}))
+ {
+ std::error_code DummyEc;
+ RemoveDir(m_Options.TempDir, DummyEc);
+ }
+ });
+
+ m_LogOutput.SetLogOperationProgress(TaskSteps::PrepareBuild, TaskSteps::StepCount);
+
+ std::uint64_t TotalRawSize = 0;
+
+ CbObject ChunkerParameters;
+
+ struct PrepareBuildResult
+ {
+ std::vector<ChunkBlockDescription> KnownBlocks;
+ uint64_t PreferredMultipartChunkSize = 0;
+ uint64_t PayloadSize = 0;
+ uint64_t PrepareBuildTimeMs = 0;
+ uint64_t FindBlocksTimeMs = 0;
+ uint64_t ElapsedTimeMs = 0;
+ };
+
+ std::future<PrepareBuildResult> PrepBuildResultFuture = m_NetworkPool.EnqueueTask(
+ std::packaged_task<PrepareBuildResult()>{[this] {
+ ZEN_TRACE_CPU("PrepareBuild");
+
+ PrepareBuildResult Result;
+ Result.PreferredMultipartChunkSize = m_Options.PreferredMultipartChunkSize;
+ Stopwatch Timer;
+ if (m_CreateBuild)
+ {
+ ZEN_TRACE_CPU("CreateBuild");
+
+ Stopwatch PutBuildTimer;
+ CbObject PutBuildResult = m_Storage.BuildStorage->PutBuild(m_BuildId, m_MetaData);
+ Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs();
+ Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize);
+ Result.PayloadSize = m_MetaData.GetSize();
+ }
+ else
+ {
+ ZEN_TRACE_CPU("PutBuild");
+ Stopwatch GetBuildTimer;
+ CbObject Build = m_Storage.BuildStorage->GetBuild(m_BuildId);
+ Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs();
+ Result.PayloadSize = Build.GetSize();
+ if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0)
+ {
+ Result.PreferredMultipartChunkSize = ChunkSize;
+ }
+ else if (m_Options.AllowMultiparts)
+ {
+ LOG_OUTPUT_WARN(m_LogOutput,
+ "PreferredMultipartChunkSize is unknown. Defaulting to '{}'",
+ NiceBytes(Result.PreferredMultipartChunkSize));
+ }
+ }
+
+ if (!m_Options.IgnoreExistingBlocks)
+ {
+ ZEN_TRACE_CPU("FindBlocks");
+ Stopwatch KnownBlocksTimer;
+ CbObject BlockDescriptionList = m_Storage.BuildStorage->FindBlocks(m_BuildId, m_Options.FindBlockMaxCount);
+ if (BlockDescriptionList)
+ {
+ Result.KnownBlocks = ParseChunkBlockDescriptionList(BlockDescriptionList);
+ }
+ m_FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs();
+ m_FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size();
+ Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs();
+ }
+ Result.ElapsedTimeMs = Timer.GetElapsedTimeMs();
+ return Result;
+ }},
+ WorkerThreadPool::EMode::EnableBacklog);
+
+ ChunkedFolderContent LocalContent;
+
+ {
+ auto IsAcceptedFolder = [this](const std::string_view& RelativePath) -> bool {
+ for (const std::string& ExcludeFolder : m_Options.ExcludeFolders)
+ {
+ if (RelativePath.starts_with(ExcludeFolder))
+ {
+ if (RelativePath.length() == ExcludeFolder.length())
+ {
+ return false;
+ }
+ else if (RelativePath[ExcludeFolder.length()] == '/')
+ {
+ return false;
+ }
+ }
+ }
+ return true;
+ };
+
+ auto IsAcceptedFile = [this](const std::string_view& RelativePath, uint64_t, uint32_t) -> bool {
+ for (const std::string& ExcludeExtension : m_Options.ExcludeExtensions)
+ {
+ if (RelativePath.ends_with(ExcludeExtension))
+ {
+ return false;
+ }
+ }
+ return true;
+ };
+
+ auto ParseManifest = [](const std::filesystem::path& Path,
+ const std::filesystem::path& ManifestPath) -> std::vector<std::filesystem::path> {
+ std::vector<std::filesystem::path> AssetPaths;
+ std::filesystem::path AbsoluteManifestPath =
+ MakeSafeAbsolutePath(ManifestPath.is_absolute() ? ManifestPath : Path / ManifestPath);
+ IoBuffer ManifestContent = ReadFile(AbsoluteManifestPath).Flatten();
+ std::string_view ManifestString((const char*)ManifestContent.GetView().GetData(), ManifestContent.GetSize());
+ std::string_view::size_type Offset = 0;
+ while (Offset < ManifestContent.GetSize())
+ {
+ size_t PathBreakOffset = ManifestString.find_first_of("\t\r\n", Offset);
+ if (PathBreakOffset == std::string_view::npos)
+ {
+ PathBreakOffset = ManifestContent.GetSize();
+ }
+ std::string_view AssetPath = ManifestString.substr(Offset, PathBreakOffset - Offset);
+ if (!AssetPath.empty())
+ {
+ AssetPaths.emplace_back(std::filesystem::path(AssetPath));
+ }
+ Offset = PathBreakOffset;
+ size_t EolOffset = ManifestString.find_first_of("\r\n", Offset);
+ if (EolOffset == std::string_view::npos)
+ {
+ break;
+ }
+ Offset = EolOffset;
+ size_t LineBreakOffset = ManifestString.find_first_not_of("\t\r\n", Offset);
+ if (LineBreakOffset == std::string_view::npos)
+ {
+ break;
+ }
+ Offset = LineBreakOffset;
+ }
+ return AssetPaths;
+ };
+
+ Stopwatch ScanTimer;
+ FolderContent Content;
+ if (m_ManifestPath.empty())
+ {
+ std::filesystem::path ExcludeManifestPath = m_Path / m_Options.ZenExcludeManifestName;
+ tsl::robin_set<std::string> ExcludeAssetPaths;
+ if (IsFile(ExcludeManifestPath))
+ {
+ std::vector<std::filesystem::path> AssetPaths = ParseManifest(m_Path, ExcludeManifestPath);
+ ExcludeAssetPaths.reserve(AssetPaths.size());
+ for (const std::filesystem::path& AssetPath : AssetPaths)
+ {
+ ExcludeAssetPaths.insert(AssetPath.generic_string());
+ }
+ }
+ Content = GetFolderContent(
+ m_LocalFolderScanStats,
+ m_Path,
+ std::move(IsAcceptedFolder),
+ [this, &IsAcceptedFile, &ExcludeAssetPaths](const std::string_view& RelativePath,
+ uint64_t Size,
+ uint32_t Attributes) -> bool {
+ if (RelativePath == m_Options.ZenExcludeManifestName)
+ {
+ return false;
+ }
+ if (!IsAcceptedFile(RelativePath, Size, Attributes))
+ {
+ return false;
+ }
+ if (ExcludeAssetPaths.contains(std::filesystem::path(RelativePath).generic_string()))
+ {
+ return false;
+ }
+ return true;
+ },
+ m_IOWorkerPool,
+ m_LogOutput.GetProgressUpdateDelayMS(),
+ [&](bool, std::ptrdiff_t) {
+ LOG_OUTPUT(m_LogOutput, "Found {} files in '{}'...", m_LocalFolderScanStats.AcceptedFileCount.load(), m_Path);
+ },
+ m_AbortFlag);
+ }
+ else
+ {
+ Stopwatch ManifestParseTimer;
+ std::vector<std::filesystem::path> AssetPaths = ParseManifest(m_Path, m_ManifestPath);
+ for (const std::filesystem::path& AssetPath : AssetPaths)
+ {
+ Content.Paths.push_back(AssetPath);
+ const std::filesystem::path AssetFilePath = (m_Path / AssetPath).make_preferred();
+ Content.RawSizes.push_back(FileSizeFromPath(AssetFilePath));
+#if ZEN_PLATFORM_WINDOWS
+ Content.Attributes.push_back(GetFileAttributesFromPath(AssetFilePath));
+#endif // ZEN_PLATFORM_WINDOWS
+#if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX
+ Content.Attributes.push_back(GetFileMode(AssetFilePath));
+#endif // ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX
+ m_LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back();
+ m_LocalFolderScanStats.AcceptedFileCount++;
+ }
+ if (m_ManifestPath.is_relative())
+ {
+ Content.Paths.push_back(m_ManifestPath);
+ const std::filesystem::path ManifestFilePath = (m_Path / m_ManifestPath).make_preferred();
+ Content.RawSizes.push_back(FileSizeFromPath(ManifestFilePath));
+#if ZEN_PLATFORM_WINDOWS
+ Content.Attributes.push_back(GetFileAttributesFromPath(ManifestFilePath));
+#endif // ZEN_PLATFORM_WINDOWS
+#if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX
+ Content.Attributes.push_back(GetFileMode(ManifestFilePath));
+#endif // ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX
+
+ m_LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back();
+ m_LocalFolderScanStats.AcceptedFileCount++;
+ }
+ m_LocalFolderScanStats.FoundFileByteCount.store(m_LocalFolderScanStats.AcceptedFileByteCount);
+ m_LocalFolderScanStats.FoundFileCount.store(m_LocalFolderScanStats.AcceptedFileCount);
+ m_LocalFolderScanStats.ElapsedWallTimeUS = ManifestParseTimer.GetElapsedTimeUs();
+ }
+
+ std::unique_ptr<ChunkingController> ChunkController =
+ CreateChunkingControllerWithFixedChunking(ChunkingControllerWithFixedChunkingSettings{});
+ {
+ CbObjectWriter ChunkParametersWriter;
+ ChunkParametersWriter.AddString("name"sv, ChunkController->GetName());
+ ChunkParametersWriter.AddObject("parameters"sv, ChunkController->GetParameters());
+ ChunkerParameters = ChunkParametersWriter.Save();
+ }
+
+ TotalRawSize = std::accumulate(Content.RawSizes.begin(), Content.RawSizes.end(), std::uint64_t(0));
+
+ {
+ std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Scan Folder"));
+ BuildOpLogOutput::ProgressBar& Progress(*ProgressBarPtr);
+
+ FilteredRate FilteredBytesHashed;
+ FilteredBytesHashed.Start();
+ LocalContent = ChunkFolderContent(
+ m_ChunkingStats,
+ m_IOWorkerPool,
+ m_Path,
+ Content,
+ *ChunkController,
+ m_LogOutput.GetProgressUpdateDelayMS(),
+ [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) {
+ FilteredBytesHashed.Update(m_ChunkingStats.BytesHashed.load());
+ std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found",
+ m_ChunkingStats.FilesProcessed.load(),
+ Content.Paths.size(),
+ NiceBytes(m_ChunkingStats.BytesHashed.load()),
+ NiceBytes(TotalRawSize),
+ NiceNum(FilteredBytesHashed.GetCurrent()),
+ m_ChunkingStats.UniqueChunksFound.load(),
+ NiceBytes(m_ChunkingStats.UniqueBytesFound.load()));
+ Progress.UpdateState({.Task = "Scanning files ",
+ .Details = Details,
+ .TotalCount = TotalRawSize,
+ .RemainingCount = TotalRawSize - m_ChunkingStats.BytesHashed.load(),
+ .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ },
+ m_AbortFlag,
+ m_PauseFlag);
+ FilteredBytesHashed.Stop();
+ Progress.Finish();
+ if (m_AbortFlag)
+ {
+ return;
+ }
+ }
+
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Found {} ({}) files divided into {} ({}) unique chunks in '{}' in {}. Average hash rate {}B/sec",
+ LocalContent.Paths.size(),
+ NiceBytes(TotalRawSize),
+ m_ChunkingStats.UniqueChunksFound.load(),
+ NiceBytes(m_ChunkingStats.UniqueBytesFound.load()),
+ m_Path,
+ NiceTimeSpanMs(ScanTimer.GetElapsedTimeMs()),
+ NiceNum(GetBytesPerSecond(m_ChunkingStats.ElapsedWallTimeUS, m_ChunkingStats.BytesHashed)));
+ }
+ }
+
+ const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalContent);
+
+ std::vector<size_t> ReuseBlockIndexes;
+ std::vector<uint32_t> NewBlockChunkIndexes;
+
+ PrepareBuildResult PrepBuildResult = PrepBuildResultFuture.get();
+
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Build prepare took {}. {} took {}, payload size {}{}",
+ NiceTimeSpanMs(PrepBuildResult.ElapsedTimeMs),
+ m_CreateBuild ? "PutBuild" : "GetBuild",
+ NiceTimeSpanMs(PrepBuildResult.PrepareBuildTimeMs),
+ NiceBytes(PrepBuildResult.PayloadSize),
+ m_Options.IgnoreExistingBlocks ? ""
+ : fmt::format(". Found {} blocks in {}",
+ PrepBuildResult.KnownBlocks.size(),
+ NiceTimeSpanMs(PrepBuildResult.FindBlocksTimeMs)));
+ }
+
+ m_LogOutput.SetLogOperationProgress(TaskSteps::CalculateDelta, TaskSteps::StepCount);
+
+ const std::uint64_t LargeAttachmentSize =
+ m_Options.AllowMultiparts ? PrepBuildResult.PreferredMultipartChunkSize * 4u : (std::uint64_t)-1;
+
+ Stopwatch BlockArrangeTimer;
+
+ std::vector<std::uint32_t> LooseChunkIndexes;
+ {
+ bool EnableBlocks = true;
+ std::vector<std::uint32_t> BlockChunkIndexes;
+ for (uint32_t ChunkIndex = 0; ChunkIndex < LocalContent.ChunkedContent.ChunkHashes.size(); ChunkIndex++)
+ {
+ const uint64_t ChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex];
+ if (!EnableBlocks || ChunkRawSize == 0 || ChunkRawSize > m_Options.BlockParameters.MaxChunkEmbedSize)
+ {
+ LooseChunkIndexes.push_back(ChunkIndex);
+ m_LooseChunksStats.ChunkByteCount += ChunkRawSize;
+ }
+ else
+ {
+ BlockChunkIndexes.push_back(ChunkIndex);
+ m_FindBlocksStats.PotentialChunkByteCount += ChunkRawSize;
+ }
+ }
+ m_FindBlocksStats.PotentialChunkCount = BlockChunkIndexes.size();
+ m_LooseChunksStats.ChunkCount = LooseChunkIndexes.size();
+
+ if (m_Options.IgnoreExistingBlocks)
+ {
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput, "Ignoring any existing blocks in store");
+ }
+ NewBlockChunkIndexes = std::move(BlockChunkIndexes);
+ }
+ else
+ {
+ ReuseBlockIndexes = FindReuseBlocks(PrepBuildResult.KnownBlocks,
+ LocalContent.ChunkedContent.ChunkHashes,
+ BlockChunkIndexes,
+ NewBlockChunkIndexes);
+ m_FindBlocksStats.AcceptedBlockCount = ReuseBlockIndexes.size();
+
+ for (const ChunkBlockDescription& Description : PrepBuildResult.KnownBlocks)
+ {
+ for (uint32_t ChunkRawLength : Description.ChunkRawLengths)
+ {
+ m_FindBlocksStats.FoundBlockByteCount += ChunkRawLength;
+ }
+ m_FindBlocksStats.FoundBlockChunkCount += Description.ChunkRawHashes.size();
+ }
+ }
+ }
+
+ std::vector<std::vector<uint32_t>> NewBlockChunks;
+ ArrangeChunksIntoBlocks(LocalContent, LocalLookup, NewBlockChunkIndexes, NewBlockChunks);
+
+ m_FindBlocksStats.NewBlocksCount = NewBlockChunks.size();
+ for (uint32_t ChunkIndex : NewBlockChunkIndexes)
+ {
+ m_FindBlocksStats.NewBlocksChunkByteCount += LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex];
+ }
+ m_FindBlocksStats.NewBlocksChunkCount = NewBlockChunkIndexes.size();
+
+ const double AcceptedByteCountPercent =
+ m_FindBlocksStats.PotentialChunkByteCount > 0
+ ? (100.0 * m_FindBlocksStats.AcceptedRawByteCount / m_FindBlocksStats.PotentialChunkByteCount)
+ : 0.0;
+
+ const double AcceptedReduntantByteCountPercent =
+ m_FindBlocksStats.AcceptedByteCount > 0 ? (100.0 * m_FindBlocksStats.AcceptedReduntantByteCount) /
+ (m_FindBlocksStats.AcceptedByteCount + m_FindBlocksStats.AcceptedReduntantByteCount)
+ : 0.0;
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Found {} chunks in {} ({}) blocks eligible for reuse in {}\n"
+ " Reusing {} ({}) matching chunks in {} blocks ({:.1f}%)\n"
+ " Accepting {} ({}) redundant chunks ({:.1f}%)\n"
+ " Rejected {} ({}) chunks in {} blocks\n"
+ " Arranged {} ({}) chunks in {} new blocks\n"
+ " Keeping {} ({}) chunks as loose chunks\n"
+ " Discovery completed in {}",
+ m_FindBlocksStats.FoundBlockChunkCount,
+ m_FindBlocksStats.FoundBlockCount,
+ NiceBytes(m_FindBlocksStats.FoundBlockByteCount),
+ NiceTimeSpanMs(m_FindBlocksStats.FindBlockTimeMS),
+
+ m_FindBlocksStats.AcceptedChunkCount,
+ NiceBytes(m_FindBlocksStats.AcceptedRawByteCount),
+ m_FindBlocksStats.AcceptedBlockCount,
+ AcceptedByteCountPercent,
+
+ m_FindBlocksStats.AcceptedReduntantChunkCount,
+ NiceBytes(m_FindBlocksStats.AcceptedReduntantByteCount),
+ AcceptedReduntantByteCountPercent,
+
+ m_FindBlocksStats.RejectedChunkCount,
+ NiceBytes(m_FindBlocksStats.RejectedByteCount),
+ m_FindBlocksStats.RejectedBlockCount,
+
+ m_FindBlocksStats.NewBlocksChunkCount,
+ NiceBytes(m_FindBlocksStats.NewBlocksChunkByteCount),
+ m_FindBlocksStats.NewBlocksCount,
+
+ m_LooseChunksStats.ChunkCount,
+ NiceBytes(m_LooseChunksStats.ChunkByteCount),
+
+ NiceTimeSpanMs(BlockArrangeTimer.GetElapsedTimeMs()));
+ }
+
+ GeneratedBlocks NewBlocks;
+
+ m_LogOutput.SetLogOperationProgress(TaskSteps::Upload, TaskSteps::StepCount);
+
+ if (!NewBlockChunks.empty())
+ {
+ Stopwatch GenerateBuildBlocksTimer;
+ auto __ = MakeGuard([&]() {
+ uint64_t BlockGenerateTimeUs = GenerateBuildBlocksTimer.GetElapsedTimeUs();
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Generated {} ({}) and uploaded {} ({}) blocks in {}. Generate speed: {}B/sec. Transfer speed {}bits/sec.",
+ m_GenerateBlocksStats.GeneratedBlockCount.load(),
+ NiceBytes(m_GenerateBlocksStats.GeneratedBlockByteCount),
+ m_UploadStats.BlockCount.load(),
+ NiceBytes(m_UploadStats.BlocksBytes.load()),
+ NiceTimeSpanMs(BlockGenerateTimeUs / 1000),
+ NiceNum(GetBytesPerSecond(m_GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS,
+ m_GenerateBlocksStats.GeneratedBlockByteCount)),
+ NiceNum(GetBytesPerSecond(m_UploadStats.ElapsedWallTimeUS, m_UploadStats.BlocksBytes * 8)));
+ }
+ });
+ GenerateBuildBlocks(LocalContent, LocalLookup, NewBlockChunks, NewBlocks);
+ }
+
+ CbObject PartManifest;
+ {
+ CbObjectWriter PartManifestWriter;
+ Stopwatch ManifestGenerationTimer;
+ auto __ = MakeGuard([&]() {
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Generated build part manifest in {} ({})",
+ NiceTimeSpanMs(ManifestGenerationTimer.GetElapsedTimeMs()),
+ NiceBytes(PartManifestWriter.GetSaveSize()));
+ }
+ });
+ PartManifestWriter.AddObject("chunker"sv, ChunkerParameters);
+
+ std::vector<IoHash> AllChunkBlockHashes;
+ std::vector<ChunkBlockDescription> AllChunkBlockDescriptions;
+ AllChunkBlockHashes.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size());
+ AllChunkBlockDescriptions.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size());
+ for (size_t ReuseBlockIndex : ReuseBlockIndexes)
+ {
+ AllChunkBlockDescriptions.push_back(PrepBuildResult.KnownBlocks[ReuseBlockIndex]);
+ AllChunkBlockHashes.push_back(PrepBuildResult.KnownBlocks[ReuseBlockIndex].BlockHash);
+ }
+ AllChunkBlockDescriptions.insert(AllChunkBlockDescriptions.end(),
+ NewBlocks.BlockDescriptions.begin(),
+ NewBlocks.BlockDescriptions.end());
+ for (const ChunkBlockDescription& BlockDescription : NewBlocks.BlockDescriptions)
+ {
+ AllChunkBlockHashes.push_back(BlockDescription.BlockHash);
+ }
+#if EXTRA_VERIFY
+ tsl::robin_map<IoHash, size_t, IoHash::Hasher> ChunkHashToAbsoluteChunkIndex;
+ std::vector<IoHash> AbsoluteChunkHashes;
+ AbsoluteChunkHashes.reserve(LocalContent.ChunkedContent.ChunkHashes.size());
+ for (uint32_t ChunkIndex : LooseChunkIndexes)
+ {
+ ChunkHashToAbsoluteChunkIndex.insert({LocalContent.ChunkedContent.ChunkHashes[ChunkIndex], AbsoluteChunkHashes.size()});
+ AbsoluteChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]);
+ }
+ for (const ChunkBlockDescription& Block : AllChunkBlockDescriptions)
+ {
+ for (const IoHash& ChunkHash : Block.ChunkHashes)
+ {
+ ChunkHashToAbsoluteChunkIndex.insert({ChunkHash, AbsoluteChunkHashes.size()});
+ AbsoluteChunkHashes.push_back(ChunkHash);
+ }
+ }
+ for (const IoHash& ChunkHash : LocalContent.ChunkedContent.ChunkHashes)
+ {
+ ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(ChunkHash)] == ChunkHash);
+ ZEN_ASSERT(LocalContent.ChunkedContent.ChunkHashes[LocalLookup.ChunkHashToChunkIndex.at(ChunkHash)] == ChunkHash);
+ }
+ for (const uint32_t ChunkIndex : LocalContent.ChunkedContent.ChunkOrders)
+ {
+ ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex])] ==
+ LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]);
+ ZEN_ASSERT(LocalLookup.ChunkHashToChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]) == ChunkIndex);
+ }
+#endif // EXTRA_VERIFY
+ std::vector<uint32_t> AbsoluteChunkOrders = CalculateAbsoluteChunkOrders(LocalContent.ChunkedContent.ChunkHashes,
+ LocalContent.ChunkedContent.ChunkOrders,
+ LocalLookup.ChunkHashToChunkIndex,
+ LooseChunkIndexes,
+ AllChunkBlockDescriptions);
+
+#if EXTRA_VERIFY
+ for (uint32_t ChunkOrderIndex = 0; ChunkOrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); ChunkOrderIndex++)
+ {
+ uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[ChunkOrderIndex];
+ uint32_t AbsoluteChunkIndex = AbsoluteChunkOrders[ChunkOrderIndex];
+ const IoHash& LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
+ const IoHash& AbsoluteChunkHash = AbsoluteChunkHashes[AbsoluteChunkIndex];
+ ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash);
+ }
+#endif // EXTRA_VERIFY
+
+ WriteBuildContentToCompactBinary(PartManifestWriter,
+ LocalContent.Platform,
+ LocalContent.Paths,
+ LocalContent.RawHashes,
+ LocalContent.RawSizes,
+ LocalContent.Attributes,
+ LocalContent.ChunkedContent.SequenceRawHashes,
+ LocalContent.ChunkedContent.ChunkCounts,
+ LocalContent.ChunkedContent.ChunkHashes,
+ LocalContent.ChunkedContent.ChunkRawSizes,
+ AbsoluteChunkOrders,
+ LooseChunkIndexes,
+ AllChunkBlockHashes);
+
+#if EXTRA_VERIFY
+ {
+ ChunkedFolderContent VerifyFolderContent;
+
+ std::vector<uint32_t> OutAbsoluteChunkOrders;
+ std::vector<IoHash> OutLooseChunkHashes;
+ std::vector<uint64_t> OutLooseChunkRawSizes;
+ std::vector<IoHash> OutBlockRawHashes;
+ ReadBuildContentFromCompactBinary(PartManifestWriter.Save(),
+ VerifyFolderContent.Platform,
+ VerifyFolderContent.Paths,
+ VerifyFolderContent.RawHashes,
+ VerifyFolderContent.RawSizes,
+ VerifyFolderContent.Attributes,
+ VerifyFolderContent.ChunkedContent.SequenceRawHashes,
+ VerifyFolderContent.ChunkedContent.ChunkCounts,
+ OutAbsoluteChunkOrders,
+ OutLooseChunkHashes,
+ OutLooseChunkRawSizes,
+ OutBlockRawHashes);
+ ZEN_ASSERT(OutBlockRawHashes == AllChunkBlockHashes);
+
+ for (uint32_t OrderIndex = 0; OrderIndex < OutAbsoluteChunkOrders.size(); OrderIndex++)
+ {
+ uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex];
+ const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
+
+ uint32_t VerifyChunkIndex = OutAbsoluteChunkOrders[OrderIndex];
+ const IoHash VerifyChunkHash = AbsoluteChunkHashes[VerifyChunkIndex];
+
+ ZEN_ASSERT(LocalChunkHash == VerifyChunkHash);
+ }
+
+ CalculateLocalChunkOrders(OutAbsoluteChunkOrders,
+ OutLooseChunkHashes,
+ OutLooseChunkRawSizes,
+ AllChunkBlockDescriptions,
+ VerifyFolderContent.ChunkedContent.ChunkHashes,
+ VerifyFolderContent.ChunkedContent.ChunkRawSizes,
+ VerifyFolderContent.ChunkedContent.ChunkOrders);
+
+ ZEN_ASSERT(LocalContent.Paths == VerifyFolderContent.Paths);
+ ZEN_ASSERT(LocalContent.RawHashes == VerifyFolderContent.RawHashes);
+ ZEN_ASSERT(LocalContent.RawSizes == VerifyFolderContent.RawSizes);
+ ZEN_ASSERT(LocalContent.Attributes == VerifyFolderContent.Attributes);
+ ZEN_ASSERT(LocalContent.ChunkedContent.SequenceRawHashes == VerifyFolderContent.ChunkedContent.SequenceRawHashes);
+ ZEN_ASSERT(LocalContent.ChunkedContent.ChunkCounts == VerifyFolderContent.ChunkedContent.ChunkCounts);
+
+ for (uint32_t OrderIndex = 0; OrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); OrderIndex++)
+ {
+ uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex];
+ const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
+ uint64_t LocalChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex];
+
+ uint32_t VerifyChunkIndex = VerifyFolderContent.ChunkedContent.ChunkOrders[OrderIndex];
+ const IoHash VerifyChunkHash = VerifyFolderContent.ChunkedContent.ChunkHashes[VerifyChunkIndex];
+ uint64_t VerifyChunkRawSize = VerifyFolderContent.ChunkedContent.ChunkRawSizes[VerifyChunkIndex];
+
+ ZEN_ASSERT(LocalChunkHash == VerifyChunkHash);
+ ZEN_ASSERT(LocalChunkRawSize == VerifyChunkRawSize);
+ }
+ }
+#endif // EXTRA_VERIFY
+ PartManifest = PartManifestWriter.Save();
+ }
+
+ Stopwatch PutBuildPartResultTimer;
+ std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult =
+ m_Storage.BuildStorage->PutBuildPart(m_BuildId, m_BuildPartId, m_BuildPartName, PartManifest);
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "PutBuildPart took {}, payload size {}. {} attachments are needed.",
+ NiceTimeSpanMs(PutBuildPartResultTimer.GetElapsedTimeMs()),
+ NiceBytes(PartManifest.GetSize()),
+ PutBuildPartResult.second.size());
+ }
+ IoHash PartHash = PutBuildPartResult.first;
+
+ auto UploadAttachments = [this, &LocalContent, &LocalLookup, &NewBlockChunks, &NewBlocks, &LooseChunkIndexes, &LargeAttachmentSize](
+ std::span<IoHash> RawHashes) {
+ if (!m_AbortFlag)
+ {
+ UploadStatistics TempUploadStats;
+ LooseChunksStatistics TempLooseChunksStats;
+
+ Stopwatch TempUploadTimer;
+ auto __ = MakeGuard([&]() {
+ if (!m_Options.IsQuiet)
+ {
+ uint64_t TempChunkUploadTimeUs = TempUploadTimer.GetElapsedTimeUs();
+ LOG_OUTPUT(m_LogOutput,
+ "Uploaded {} ({}) blocks. "
+ "Compressed {} ({} {}B/s) and uploaded {} ({}) chunks. "
+ "Transferred {} ({}bits/s) in {}",
+ TempUploadStats.BlockCount.load(),
+ NiceBytes(TempUploadStats.BlocksBytes),
+
+ TempLooseChunksStats.CompressedChunkCount.load(),
+ NiceBytes(TempLooseChunksStats.CompressedChunkBytes.load()),
+ NiceNum(GetBytesPerSecond(TempLooseChunksStats.CompressChunksElapsedWallTimeUS,
+ TempLooseChunksStats.ChunkByteCount)),
+ TempUploadStats.ChunkCount.load(),
+ NiceBytes(TempUploadStats.ChunksBytes),
+
+ NiceBytes(TempUploadStats.BlocksBytes + TempUploadStats.ChunksBytes),
+ NiceNum(GetBytesPerSecond(TempUploadStats.ElapsedWallTimeUS, TempUploadStats.ChunksBytes * 8)),
+ NiceTimeSpanMs(TempChunkUploadTimeUs / 1000));
+ }
+ });
+ UploadPartBlobs(LocalContent,
+ LocalLookup,
+ RawHashes,
+ NewBlockChunks,
+ NewBlocks,
+ LooseChunkIndexes,
+ LargeAttachmentSize,
+ TempUploadStats,
+ TempLooseChunksStats);
+ m_UploadStats += TempUploadStats;
+ m_LooseChunksStats += TempLooseChunksStats;
+ }
+ };
+ if (m_Options.IgnoreExistingBlocks)
+ {
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "PutBuildPart uploading all attachments, needs are: {}",
+ FormatArray<IoHash>(PutBuildPartResult.second, "\n "sv));
+ }
+
+ std::vector<IoHash> ForceUploadChunkHashes;
+ ForceUploadChunkHashes.reserve(LooseChunkIndexes.size());
+
+ for (uint32_t ChunkIndex : LooseChunkIndexes)
+ {
+ ForceUploadChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]);
+ }
+
+ for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockHeaders.size(); BlockIndex++)
+ {
+ if (NewBlocks.BlockHeaders[BlockIndex])
+ {
+ // Block was not uploaded during generation
+ ForceUploadChunkHashes.push_back(NewBlocks.BlockDescriptions[BlockIndex].BlockHash);
+ }
+ }
+ UploadAttachments(ForceUploadChunkHashes);
+ }
+ else if (!PutBuildPartResult.second.empty())
+ {
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput, "PutBuildPart needs attachments: {}", FormatArray<IoHash>(PutBuildPartResult.second, "\n "sv));
+ }
+ UploadAttachments(PutBuildPartResult.second);
+ }
+
+ uint32_t FinalizeBuildPartRetryCount = 5;
+ while (!m_AbortFlag && (FinalizeBuildPartRetryCount--) > 0)
+ {
+ Stopwatch FinalizeBuildPartTimer;
+ std::vector<IoHash> Needs = m_Storage.BuildStorage->FinalizeBuildPart(m_BuildId, m_BuildPartId, PartHash);
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "FinalizeBuildPart took {}. {} attachments are missing.",
+ NiceTimeSpanMs(FinalizeBuildPartTimer.GetElapsedTimeMs()),
+ Needs.size());
+ }
+ if (Needs.empty())
+ {
+ break;
+ }
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput, "FinalizeBuildPart needs attachments: {}", FormatArray<IoHash>(Needs, "\n "sv));
+ }
+ UploadAttachments(Needs);
+ }
+
+ if (m_CreateBuild && !m_AbortFlag)
+ {
+ Stopwatch FinalizeBuildTimer;
+ m_Storage.BuildStorage->FinalizeBuild(m_BuildId);
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput, "FinalizeBuild took {}", NiceTimeSpanMs(FinalizeBuildTimer.GetElapsedTimeMs()));
+ }
+ }
+
+ if (!NewBlocks.BlockDescriptions.empty() && !m_AbortFlag)
+ {
+ uint64_t UploadBlockMetadataCount = 0;
+ Stopwatch UploadBlockMetadataTimer;
+
+ uint32_t FailedMetadataUploadCount = 1;
+ int32_t MetadataUploadRetryCount = 3;
+ while ((MetadataUploadRetryCount-- > 0) && (FailedMetadataUploadCount > 0))
+ {
+ FailedMetadataUploadCount = 0;
+ for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockDescriptions.size(); BlockIndex++)
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash;
+ if (!NewBlocks.MetaDataHasBeenUploaded[BlockIndex])
+ {
+ const CbObject BlockMetaData =
+ BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]);
+ if (m_Storage.BuildCacheStorage)
+ {
+ m_Storage.BuildCacheStorage->PutBlobMetadatas(m_BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
+ }
+ bool MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData);
+ if (MetadataSucceeded)
+ {
+ m_UploadStats.BlocksBytes += BlockMetaData.GetSize();
+ NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
+ UploadBlockMetadataCount++;
+ }
+ else
+ {
+ FailedMetadataUploadCount++;
+ }
+ }
+ }
+ }
+ if (UploadBlockMetadataCount > 0)
+ {
+ uint64_t ElapsedUS = UploadBlockMetadataTimer.GetElapsedTimeUs();
+ m_UploadStats.ElapsedWallTimeUS += ElapsedUS;
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Uploaded metadata for {} blocks in {}",
+ UploadBlockMetadataCount,
+ NiceTimeSpanMs(ElapsedUS / 1000));
+ }
+ }
+ }
+
+ // if (m_PostUploadVerify && !m_AbortFlag)
+ // {
+ // //m_LogOutput.SetLogOperationProgress(TaskSteps::Validate, TaskSteps::StepCount);
+ // ValidateBuildPart(*m_Storage.BuildStorage, m_BuildId, m_BuildPartId, m_BuildPartName, ValidateStats, ValidateDownloadStats);
+ // std::string ValidateInfo;
+ // if (m_PostUploadVerify)
+ // {
+ // const uint64_t DownloadedCount = ValidateDownloadStats.DownloadedChunkCount + ValidateDownloadStats.DownloadedBlockCount;
+ // const uint64_t DownloadedByteCount =
+ // ValidateDownloadStats.DownloadedChunkByteCount + ValidateDownloadStats.DownloadedBlockByteCount;
+ // ValidateInfo = fmt::format("\n Verified: {:>8} ({}), {}B/sec, {}",
+ // DownloadedCount,
+ // NiceBytes(DownloadedByteCount),
+ // NiceNum(GetBytesPerSecond(ValidateStats.ElapsedWallTimeUS, DownloadedByteCount)),
+ // NiceTimeSpanMs(ValidateStats.ElapsedWallTimeUS / 1000));
+ // }
+ // }
+
+ m_Storage.BuildStorage->PutBuildPartStats(
+ m_BuildId,
+ m_BuildPartId,
+ {{"totalSize", double(m_LocalFolderScanStats.FoundFileByteCount.load())},
+ {"reusedRatio", AcceptedByteCountPercent / 100.0},
+ {"reusedBlockCount", double(m_FindBlocksStats.AcceptedBlockCount)},
+ {"reusedBlockByteCount", double(m_FindBlocksStats.AcceptedRawByteCount)},
+ {"newBlockCount", double(m_FindBlocksStats.NewBlocksCount)},
+ {"newBlockByteCount", double(m_FindBlocksStats.NewBlocksChunkByteCount)},
+ {"uploadedCount", double(m_UploadStats.BlockCount.load() + m_UploadStats.ChunkCount.load())},
+ {"uploadedByteCount", double(m_UploadStats.BlocksBytes.load() + m_UploadStats.ChunksBytes.load())},
+ {"uploadedBytesPerSec",
+ double(GetBytesPerSecond(m_UploadStats.ElapsedWallTimeUS, m_UploadStats.ChunksBytes + m_UploadStats.BlocksBytes))},
+ {"elapsedTimeSec", double(ProcessTimer.GetElapsedTimeMs() / 1000.0)}});
+
+ m_LogOutput.SetLogOperationProgress(TaskSteps::Cleanup, TaskSteps::StepCount);
+}
+
void
DownloadLargeBlob(BuildStorage& Storage,
const std::filesystem::path& DownloadFolder,
@@ -4739,4 +5744,1390 @@ DownloadLargeBlob(BuildStorage& Storage,
}
}
+std::vector<size_t>
+BuildsOperationUploadFolder::FindReuseBlocks(const std::vector<ChunkBlockDescription>& KnownBlocks,
+ std::span<const IoHash> ChunkHashes,
+ std::span<const uint32_t> ChunkIndexes,
+ std::vector<uint32_t>& OutUnusedChunkIndexes)
+{
+ ZEN_TRACE_CPU("FindReuseBlocks");
+
+ // Find all blocks with a usage level higher than MinPercentLimit
+ // Pick out the blocks with usage higher or equal to MinPercentLimit
+ // Sort them with highest size usage - most usage first
+ // Make a list of all chunks and mark them as not found
+ // For each block, recalculate the block has usage percent based on the chunks marked as not found
+ // If the block still reaches MinPercentLimit, keep it and remove the matching chunks from the not found list
+ // Repeat for following all remaining block that initially matched MinPercentLimit
+
+ std::vector<size_t> FilteredReuseBlockIndexes;
+
+ uint32_t ChunkCount = gsl::narrow<uint32_t>(ChunkHashes.size());
+ std::vector<bool> ChunkFound(ChunkCount, false);
+
+ if (ChunkCount > 0)
+ {
+ if (!KnownBlocks.empty())
+ {
+ Stopwatch ReuseTimer;
+
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex;
+ ChunkHashToChunkIndex.reserve(ChunkIndexes.size());
+ for (uint32_t ChunkIndex : ChunkIndexes)
+ {
+ ChunkHashToChunkIndex.insert_or_assign(ChunkHashes[ChunkIndex], ChunkIndex);
+ }
+
+ std::vector<size_t> BlockSizes(KnownBlocks.size(), 0);
+ std::vector<size_t> BlockUseSize(KnownBlocks.size(), 0);
+
+ std::vector<size_t> ReuseBlockIndexes;
+
+ for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++)
+ {
+ const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex];
+ if (KnownBlock.BlockHash != IoHash::Zero && KnownBlock.ChunkRawHashes.size() == KnownBlock.ChunkCompressedLengths.size())
+ {
+ size_t BlockAttachmentCount = KnownBlock.ChunkRawHashes.size();
+ if (BlockAttachmentCount == 0)
+ {
+ continue;
+ }
+ size_t ReuseSize = 0;
+ size_t BlockSize = 0;
+ size_t FoundAttachmentCount = 0;
+ size_t BlockChunkCount = KnownBlock.ChunkRawHashes.size();
+ for (size_t BlockChunkIndex = 0; BlockChunkIndex < BlockChunkCount; BlockChunkIndex++)
+ {
+ const IoHash& BlockChunkHash = KnownBlock.ChunkRawHashes[BlockChunkIndex];
+ const uint32_t BlockChunkSize = KnownBlock.ChunkCompressedLengths[BlockChunkIndex];
+ BlockSize += BlockChunkSize;
+ if (ChunkHashToChunkIndex.contains(BlockChunkHash))
+ {
+ ReuseSize += BlockChunkSize;
+ FoundAttachmentCount++;
+ }
+ }
+
+ size_t ReusePercent = (ReuseSize * 100) / BlockSize;
+
+ if (ReusePercent >= m_Options.BlockReuseMinPercentLimit)
+ {
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Reusing block {}. {} attachments found, usage level: {}%",
+ KnownBlock.BlockHash,
+ FoundAttachmentCount,
+ ReusePercent);
+ }
+ ReuseBlockIndexes.push_back(KnownBlockIndex);
+
+ BlockSizes[KnownBlockIndex] = BlockSize;
+ BlockUseSize[KnownBlockIndex] = ReuseSize;
+ }
+ else if (FoundAttachmentCount > 0)
+ {
+ // if (m_Options.IsVerbose)
+ //{
+ // LOG_OUTPUT(m_LogOutput, "Skipping block {}. {} attachments found, usage level: {}%", KnownBlock.BlockHash,
+ // FoundAttachmentCount, ReusePercent);
+ //}
+ m_FindBlocksStats.RejectedBlockCount++;
+ m_FindBlocksStats.RejectedChunkCount += FoundAttachmentCount;
+ m_FindBlocksStats.RejectedByteCount += ReuseSize;
+ }
+ }
+ }
+
+ if (!ReuseBlockIndexes.empty())
+ {
+ std::sort(ReuseBlockIndexes.begin(), ReuseBlockIndexes.end(), [&](size_t Lhs, size_t Rhs) {
+ return BlockUseSize[Lhs] > BlockUseSize[Rhs];
+ });
+
+ for (size_t KnownBlockIndex : ReuseBlockIndexes)
+ {
+ std::vector<uint32_t> FoundChunkIndexes;
+ size_t BlockSize = 0;
+ size_t AdjustedReuseSize = 0;
+ size_t AdjustedRawReuseSize = 0;
+ const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex];
+ for (size_t BlockChunkIndex = 0; BlockChunkIndex < KnownBlock.ChunkRawHashes.size(); BlockChunkIndex++)
+ {
+ const IoHash& BlockChunkHash = KnownBlock.ChunkRawHashes[BlockChunkIndex];
+ const uint32_t BlockChunkSize = KnownBlock.ChunkCompressedLengths[BlockChunkIndex];
+ BlockSize += BlockChunkSize;
+ if (auto It = ChunkHashToChunkIndex.find(BlockChunkHash); It != ChunkHashToChunkIndex.end())
+ {
+ const uint32_t ChunkIndex = It->second;
+ if (!ChunkFound[ChunkIndex])
+ {
+ FoundChunkIndexes.push_back(ChunkIndex);
+ AdjustedReuseSize += KnownBlock.ChunkCompressedLengths[BlockChunkIndex];
+ AdjustedRawReuseSize += KnownBlock.ChunkRawLengths[BlockChunkIndex];
+ }
+ }
+ }
+
+ size_t ReusePercent = (AdjustedReuseSize * 100) / BlockSize;
+
+ if (ReusePercent >= m_Options.BlockReuseMinPercentLimit)
+ {
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Reusing block {}. {} attachments found, usage level: {}%",
+ KnownBlock.BlockHash,
+ FoundChunkIndexes.size(),
+ ReusePercent);
+ }
+ FilteredReuseBlockIndexes.push_back(KnownBlockIndex);
+
+ for (uint32_t ChunkIndex : FoundChunkIndexes)
+ {
+ ChunkFound[ChunkIndex] = true;
+ }
+ m_FindBlocksStats.AcceptedChunkCount += FoundChunkIndexes.size();
+ m_FindBlocksStats.AcceptedByteCount += AdjustedReuseSize;
+ m_FindBlocksStats.AcceptedRawByteCount += AdjustedRawReuseSize;
+ m_FindBlocksStats.AcceptedReduntantChunkCount += KnownBlock.ChunkRawHashes.size() - FoundChunkIndexes.size();
+ m_FindBlocksStats.AcceptedReduntantByteCount += BlockSize - AdjustedReuseSize;
+ }
+ else
+ {
+ // if (m_Options.IsVerbose)
+ //{
+ // LOG_OUTPUT(m_LogOutput, "Skipping block {}. filtered usage level: {}%", KnownBlock.BlockHash, ReusePercent);
+ //}
+ m_FindBlocksStats.RejectedBlockCount++;
+ m_FindBlocksStats.RejectedChunkCount += FoundChunkIndexes.size();
+ m_FindBlocksStats.RejectedByteCount += AdjustedReuseSize;
+ }
+ }
+ }
+ }
+ OutUnusedChunkIndexes.reserve(ChunkIndexes.size() - m_FindBlocksStats.AcceptedChunkCount);
+ for (uint32_t ChunkIndex : ChunkIndexes)
+ {
+ if (!ChunkFound[ChunkIndex])
+ {
+ OutUnusedChunkIndexes.push_back(ChunkIndex);
+ }
+ }
+ }
+ return FilteredReuseBlockIndexes;
+}
+
+void
+BuildsOperationUploadFolder::ArrangeChunksIntoBlocks(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ std::vector<uint32_t>& ChunkIndexes,
+ std::vector<std::vector<uint32_t>>& OutBlocks)
+{
+ ZEN_TRACE_CPU("ArrangeChunksIntoBlocks");
+ std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&Content, &Lookup](uint32_t Lhs, uint32_t Rhs) {
+ const ChunkedContentLookup::ChunkSequenceLocation& LhsLocation = GetChunkSequenceLocations(Lookup, Lhs)[0];
+ const ChunkedContentLookup::ChunkSequenceLocation& RhsLocation = GetChunkSequenceLocations(Lookup, Rhs)[0];
+ if (LhsLocation.SequenceIndex < RhsLocation.SequenceIndex)
+ {
+ return true;
+ }
+ else if (LhsLocation.SequenceIndex > RhsLocation.SequenceIndex)
+ {
+ return false;
+ }
+ return LhsLocation.Offset < RhsLocation.Offset;
+ });
+
+ uint64_t MaxBlockSizeLowThreshold = m_Options.BlockParameters.MaxBlockSize - (m_Options.BlockParameters.MaxBlockSize / 16);
+
+ uint64_t BlockSize = 0;
+
+ uint32_t ChunkIndexStart = 0;
+ for (uint32_t ChunkIndexOffset = 0; ChunkIndexOffset < ChunkIndexes.size();)
+ {
+ const uint32_t ChunkIndex = ChunkIndexes[ChunkIndexOffset];
+ const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
+
+ if (((BlockSize + ChunkSize) > m_Options.BlockParameters.MaxBlockSize) ||
+ (ChunkIndexOffset - ChunkIndexStart) > m_Options.BlockParameters.MaxChunksPerBlock)
+ {
+ // Within the span of MaxBlockSizeLowThreshold and MaxBlockSize, see if there is a break
+ // between source paths for chunks. Break the block at the last such break if any.
+ ZEN_ASSERT(ChunkIndexOffset > ChunkIndexStart);
+
+ const uint32_t ChunkSequenceIndex = Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[ChunkIndex]].SequenceIndex;
+
+ uint64_t ScanBlockSize = BlockSize;
+
+ uint32_t ScanChunkIndexOffset = ChunkIndexOffset - 1;
+ while (ScanChunkIndexOffset > (ChunkIndexStart + 2))
+ {
+ const uint32_t TestChunkIndex = ChunkIndexes[ScanChunkIndexOffset];
+ const uint64_t TestChunkSize = Content.ChunkedContent.ChunkRawSizes[TestChunkIndex];
+ if ((ScanBlockSize - TestChunkSize) < MaxBlockSizeLowThreshold)
+ {
+ break;
+ }
+
+ const uint32_t TestSequenceIndex =
+ Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[TestChunkIndex]].SequenceIndex;
+ if (ChunkSequenceIndex != TestSequenceIndex)
+ {
+ ChunkIndexOffset = ScanChunkIndexOffset + 1;
+ break;
+ }
+
+ ScanBlockSize -= TestChunkSize;
+ ScanChunkIndexOffset--;
+ }
+
+ std::vector<uint32_t> ChunksInBlock;
+ ChunksInBlock.reserve(ChunkIndexOffset - ChunkIndexStart);
+ for (uint32_t AddIndexOffset = ChunkIndexStart; AddIndexOffset < ChunkIndexOffset; AddIndexOffset++)
+ {
+ const uint32_t AddChunkIndex = ChunkIndexes[AddIndexOffset];
+ ChunksInBlock.push_back(AddChunkIndex);
+ }
+ OutBlocks.emplace_back(std::move(ChunksInBlock));
+ BlockSize = 0;
+ ChunkIndexStart = ChunkIndexOffset;
+ }
+ else
+ {
+ ChunkIndexOffset++;
+ BlockSize += ChunkSize;
+ }
+ }
+ if (ChunkIndexStart < ChunkIndexes.size())
+ {
+ std::vector<uint32_t> ChunksInBlock;
+ ChunksInBlock.reserve(ChunkIndexes.size() - ChunkIndexStart);
+ for (uint32_t AddIndexOffset = ChunkIndexStart; AddIndexOffset < ChunkIndexes.size(); AddIndexOffset++)
+ {
+ const uint32_t AddChunkIndex = ChunkIndexes[AddIndexOffset];
+ ChunksInBlock.push_back(AddChunkIndex);
+ }
+ OutBlocks.emplace_back(std::move(ChunksInBlock));
+ }
+}
+
+void
+BuildsOperationUploadFolder::GenerateBuildBlocks(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ const std::vector<std::vector<uint32_t>>& NewBlockChunks,
+ GeneratedBlocks& OutBlocks)
+{
+ ZEN_TRACE_CPU("GenerateBuildBlocks");
+ const std::size_t NewBlockCount = NewBlockChunks.size();
+ if (NewBlockCount > 0)
+ {
+ std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Generate Blocks"));
+ BuildOpLogOutput::ProgressBar& Progress(*ProgressBarPtr);
+
+ OutBlocks.BlockDescriptions.resize(NewBlockCount);
+ OutBlocks.BlockSizes.resize(NewBlockCount);
+ OutBlocks.BlockMetaDatas.resize(NewBlockCount);
+ OutBlocks.BlockHeaders.resize(NewBlockCount);
+ OutBlocks.MetaDataHasBeenUploaded.resize(NewBlockCount, 0);
+ OutBlocks.BlockHashToBlockIndex.reserve(NewBlockCount);
+
+ RwLock Lock;
+
+ WorkerThreadPool& GenerateBlobsPool = m_IOWorkerPool;
+ WorkerThreadPool& UploadBlocksPool = m_NetworkPool;
+
+ FilteredRate FilteredGeneratedBytesPerSecond;
+ FilteredRate FilteredUploadedBytesPerSecond;
+
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ std::atomic<uint64_t> QueuedPendingBlocksForUpload = 0;
+
+ for (size_t BlockIndex = 0; BlockIndex < NewBlockCount; BlockIndex++)
+ {
+ if (Work.IsAborted())
+ {
+ break;
+ }
+ const std::vector<uint32_t>& ChunksInBlock = NewBlockChunks[BlockIndex];
+ Work.ScheduleWork(
+ GenerateBlobsPool,
+ [this,
+ &Content,
+ &Lookup,
+ &Work,
+ &UploadBlocksPool,
+ NewBlockCount,
+ ChunksInBlock,
+ &Lock,
+ &OutBlocks,
+ &FilteredGeneratedBytesPerSecond,
+ &QueuedPendingBlocksForUpload,
+ &FilteredUploadedBytesPerSecond,
+ BlockIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("GenerateBuildBlocks_Generate");
+
+ FilteredGeneratedBytesPerSecond.Start();
+ // TODO: Convert ScheduleWork body to function
+
+ Stopwatch GenerateTimer;
+ CompressedBuffer CompressedBlock =
+ GenerateBlock(Content, Lookup, ChunksInBlock, OutBlocks.BlockDescriptions[BlockIndex]);
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Generated block {} ({}) containing {} chunks in {}",
+ OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
+ NiceBytes(CompressedBlock.GetCompressedSize()),
+ OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(),
+ NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs()));
+ }
+
+ OutBlocks.BlockSizes[BlockIndex] = CompressedBlock.GetCompressedSize();
+ {
+ CbObjectWriter Writer;
+ Writer.AddString("createdBy", "zen");
+ OutBlocks.BlockMetaDatas[BlockIndex] = Writer.Save();
+ }
+ m_GenerateBlocksStats.GeneratedBlockByteCount += OutBlocks.BlockSizes[BlockIndex];
+ m_GenerateBlocksStats.GeneratedBlockCount++;
+
+ Lock.WithExclusiveLock([&]() {
+ OutBlocks.BlockHashToBlockIndex.insert_or_assign(OutBlocks.BlockDescriptions[BlockIndex].BlockHash, BlockIndex);
+ });
+
+ {
+ std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments();
+ ZEN_ASSERT(Segments.size() >= 2);
+ OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
+ }
+
+ if (m_GenerateBlocksStats.GeneratedBlockCount == NewBlockCount)
+ {
+ FilteredGeneratedBytesPerSecond.Stop();
+ }
+
+ if (QueuedPendingBlocksForUpload.load() > 16)
+ {
+ std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments();
+ ZEN_ASSERT(Segments.size() >= 2);
+ OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
+ }
+ else
+ {
+ if (!m_AbortFlag)
+ {
+ QueuedPendingBlocksForUpload++;
+
+ Work.ScheduleWork(
+ UploadBlocksPool,
+ [this,
+ NewBlockCount,
+ &FilteredUploadedBytesPerSecond,
+ &QueuedPendingBlocksForUpload,
+ &OutBlocks,
+ BlockIndex,
+ Payload = std::move(CompressedBlock)](std::atomic<bool>&) mutable {
+ auto _ = MakeGuard([&QueuedPendingBlocksForUpload] { QueuedPendingBlocksForUpload--; });
+ if (!m_AbortFlag)
+ {
+ if (m_GenerateBlocksStats.GeneratedBlockCount == NewBlockCount)
+ {
+ ZEN_TRACE_CPU("GenerateBuildBlocks_Save");
+
+ FilteredUploadedBytesPerSecond.Stop();
+ std::span<const SharedBuffer> Segments = Payload.GetCompressed().GetSegments();
+ ZEN_ASSERT(Segments.size() >= 2);
+ OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
+ }
+ else
+ {
+ ZEN_TRACE_CPU("GenerateBuildBlocks_Upload");
+
+ FilteredUploadedBytesPerSecond.Start();
+ // TODO: Convert ScheduleWork body to function
+
+ const CbObject BlockMetaData =
+ BuildChunkBlockDescription(OutBlocks.BlockDescriptions[BlockIndex],
+ OutBlocks.BlockMetaDatas[BlockIndex]);
+
+ const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash;
+ const uint64_t CompressedBlockSize = Payload.GetCompressedSize();
+
+ if (m_Storage.BuildCacheStorage)
+ {
+ m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId,
+ BlockHash,
+ ZenContentType::kCompressedBinary,
+ Payload.GetCompressed());
+ }
+
+ m_Storage.BuildStorage->PutBuildBlob(m_BuildId,
+ BlockHash,
+ ZenContentType::kCompressedBinary,
+ std::move(Payload).GetCompressed());
+ m_UploadStats.BlocksBytes += CompressedBlockSize;
+
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Uploaded block {} ({}) containing {} chunks",
+ BlockHash,
+ NiceBytes(CompressedBlockSize),
+ OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
+ }
+
+ if (m_Storage.BuildCacheStorage)
+ {
+ m_Storage.BuildCacheStorage->PutBlobMetadatas(m_BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
+ }
+
+ bool MetadataSucceeded =
+ m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData);
+ if (MetadataSucceeded)
+ {
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Uploaded block {} metadata ({})",
+ BlockHash,
+ NiceBytes(BlockMetaData.GetSize()));
+ }
+
+ OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
+ m_UploadStats.BlocksBytes += BlockMetaData.GetSize();
+ }
+
+ m_UploadStats.BlockCount++;
+ if (m_UploadStats.BlockCount == NewBlockCount)
+ {
+ FilteredUploadedBytesPerSecond.Stop();
+ }
+ }
+ }
+ });
+ }
+ }
+ }
+ });
+ }
+
+ Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
+
+ FilteredGeneratedBytesPerSecond.Update(m_GenerateBlocksStats.GeneratedBlockByteCount.load());
+ FilteredUploadedBytesPerSecond.Update(m_UploadStats.BlocksBytes.load());
+
+ std::string Details = fmt::format("Generated {}/{} ({}, {}B/s). Uploaded {}/{} ({}, {}bits/s)",
+ m_GenerateBlocksStats.GeneratedBlockCount.load(),
+ NewBlockCount,
+ NiceBytes(m_GenerateBlocksStats.GeneratedBlockByteCount.load()),
+ NiceNum(FilteredGeneratedBytesPerSecond.GetCurrent()),
+ m_UploadStats.BlockCount.load(),
+ NewBlockCount,
+ NiceBytes(m_UploadStats.BlocksBytes.load()),
+ NiceNum(FilteredUploadedBytesPerSecond.GetCurrent() * 8));
+
+ Progress.UpdateState({.Task = "Generating blocks",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(NewBlockCount),
+ .RemainingCount = gsl::narrow<uint64_t>(NewBlockCount - m_GenerateBlocksStats.GeneratedBlockCount.load()),
+ .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+
+ ZEN_ASSERT(m_AbortFlag || QueuedPendingBlocksForUpload.load() == 0);
+
+ Progress.Finish();
+
+ m_GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGeneratedBytesPerSecond.GetElapsedTimeUS();
+ m_UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS();
+ }
+}
+
+std::vector<uint32_t>
+BuildsOperationUploadFolder::CalculateAbsoluteChunkOrders(
+ const std::span<const IoHash> LocalChunkHashes,
+ const std::span<const uint32_t> LocalChunkOrder,
+ const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& ChunkHashToLocalChunkIndex,
+ const std::span<const uint32_t>& LooseChunkIndexes,
+ const std::span<const ChunkBlockDescription>& BlockDescriptions)
+{
+ ZEN_TRACE_CPU("CalculateAbsoluteChunkOrders");
+
+#if EXTRA_VERIFY
+ std::vector<IoHash> TmpAbsoluteChunkHashes;
+ TmpAbsoluteChunkHashes.reserve(LocalChunkHashes.size());
+#endif // EXTRA_VERIFY
+ std::vector<uint32_t> LocalChunkIndexToAbsoluteChunkIndex;
+ LocalChunkIndexToAbsoluteChunkIndex.resize(LocalChunkHashes.size(), (uint32_t)-1);
+ std::uint32_t AbsoluteChunkCount = 0;
+ for (uint32_t ChunkIndex : LooseChunkIndexes)
+ {
+ LocalChunkIndexToAbsoluteChunkIndex[ChunkIndex] = AbsoluteChunkCount;
+#if EXTRA_VERIFY
+ TmpAbsoluteChunkHashes.push_back(LocalChunkHashes[ChunkIndex]);
+#endif // EXTRA_VERIFY
+ AbsoluteChunkCount++;
+ }
+ for (const ChunkBlockDescription& Block : BlockDescriptions)
+ {
+ for (const IoHash& ChunkHash : Block.ChunkRawHashes)
+ {
+ if (auto It = ChunkHashToLocalChunkIndex.find(ChunkHash); It != ChunkHashToLocalChunkIndex.end())
+ {
+ const uint32_t LocalChunkIndex = It->second;
+ ZEN_ASSERT_SLOW(LocalChunkHashes[LocalChunkIndex] == ChunkHash);
+ LocalChunkIndexToAbsoluteChunkIndex[LocalChunkIndex] = AbsoluteChunkCount;
+ }
+#if EXTRA_VERIFY
+ TmpAbsoluteChunkHashes.push_back(ChunkHash);
+#endif // EXTRA_VERIFY
+ AbsoluteChunkCount++;
+ }
+ }
+ std::vector<uint32_t> AbsoluteChunkOrder;
+ AbsoluteChunkOrder.reserve(LocalChunkHashes.size());
+ for (const uint32_t LocalChunkIndex : LocalChunkOrder)
+ {
+ const uint32_t AbsoluteChunkIndex = LocalChunkIndexToAbsoluteChunkIndex[LocalChunkIndex];
+#if EXTRA_VERIFY
+ ZEN_ASSERT(LocalChunkHashes[LocalChunkIndex] == TmpAbsoluteChunkHashes[AbsoluteChunkIndex]);
+#endif // EXTRA_VERIFY
+ AbsoluteChunkOrder.push_back(AbsoluteChunkIndex);
+ }
+#if EXTRA_VERIFY
+ {
+ uint32_t OrderIndex = 0;
+ while (OrderIndex < LocalChunkOrder.size())
+ {
+ const uint32_t LocalChunkIndex = LocalChunkOrder[OrderIndex];
+ const IoHash& LocalChunkHash = LocalChunkHashes[LocalChunkIndex];
+ const uint32_t AbsoluteChunkIndex = AbsoluteChunkOrder[OrderIndex];
+ const IoHash& AbsoluteChunkHash = TmpAbsoluteChunkHashes[AbsoluteChunkIndex];
+ ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash);
+ OrderIndex++;
+ }
+ }
+#endif // EXTRA_VERIFY
+ return AbsoluteChunkOrder;
+}
+
+void
+BuildsOperationUploadFolder::CalculateLocalChunkOrders(const std::span<const uint32_t>& AbsoluteChunkOrders,
+ const std::span<const IoHash> LooseChunkHashes,
+ const std::span<const uint64_t> LooseChunkRawSizes,
+ const std::span<const ChunkBlockDescription>& BlockDescriptions,
+ std::vector<IoHash>& OutLocalChunkHashes,
+ std::vector<uint64_t>& OutLocalChunkRawSizes,
+ std::vector<uint32_t>& OutLocalChunkOrders)
+{
+ ZEN_TRACE_CPU("CalculateLocalChunkOrders");
+
+ std::vector<IoHash> AbsoluteChunkHashes;
+ std::vector<uint64_t> AbsoluteChunkRawSizes;
+ AbsoluteChunkHashes.insert(AbsoluteChunkHashes.end(), LooseChunkHashes.begin(), LooseChunkHashes.end());
+ AbsoluteChunkRawSizes.insert(AbsoluteChunkRawSizes.end(), LooseChunkRawSizes.begin(), LooseChunkRawSizes.end());
+ for (const ChunkBlockDescription& Block : BlockDescriptions)
+ {
+ AbsoluteChunkHashes.insert(AbsoluteChunkHashes.end(), Block.ChunkRawHashes.begin(), Block.ChunkRawHashes.end());
+ AbsoluteChunkRawSizes.insert(AbsoluteChunkRawSizes.end(), Block.ChunkRawLengths.begin(), Block.ChunkRawLengths.end());
+ }
+ OutLocalChunkHashes.reserve(AbsoluteChunkHashes.size());
+ OutLocalChunkRawSizes.reserve(AbsoluteChunkRawSizes.size());
+ OutLocalChunkOrders.reserve(AbsoluteChunkOrders.size());
+
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex;
+ ChunkHashToChunkIndex.reserve(AbsoluteChunkHashes.size());
+
+ for (uint32_t AbsoluteChunkOrderIndex = 0; AbsoluteChunkOrderIndex < AbsoluteChunkOrders.size(); AbsoluteChunkOrderIndex++)
+ {
+ const uint32_t AbsoluteChunkIndex = AbsoluteChunkOrders[AbsoluteChunkOrderIndex];
+ const IoHash& AbsoluteChunkHash = AbsoluteChunkHashes[AbsoluteChunkIndex];
+ const uint64_t AbsoluteChunkRawSize = AbsoluteChunkRawSizes[AbsoluteChunkIndex];
+
+ if (auto It = ChunkHashToChunkIndex.find(AbsoluteChunkHash); It != ChunkHashToChunkIndex.end())
+ {
+ const uint32_t LocalChunkIndex = It->second;
+ OutLocalChunkOrders.push_back(LocalChunkIndex);
+ }
+ else
+ {
+ uint32_t LocalChunkIndex = gsl::narrow<uint32_t>(OutLocalChunkHashes.size());
+ OutLocalChunkHashes.push_back(AbsoluteChunkHash);
+ OutLocalChunkRawSizes.push_back(AbsoluteChunkRawSize);
+ OutLocalChunkOrders.push_back(LocalChunkIndex);
+ ChunkHashToChunkIndex.insert_or_assign(AbsoluteChunkHash, LocalChunkIndex);
+ }
+#if EXTRA_VERIFY
+ const uint32_t LocalChunkIndex = OutLocalChunkOrders[AbsoluteChunkOrderIndex];
+ const IoHash& LocalChunkHash = OutLocalChunkHashes[LocalChunkIndex];
+ const uint64_t& LocalChunkRawSize = OutLocalChunkRawSizes[LocalChunkIndex];
+ ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash);
+ ZEN_ASSERT(LocalChunkRawSize == AbsoluteChunkRawSize);
+#endif // EXTRA_VERIFY
+ }
+#if EXTRA_VERIFY
+ for (uint32_t OrderIndex = 0; OrderIndex < OutLocalChunkOrders.size(); OrderIndex++)
+ {
+ uint32_t LocalChunkIndex = OutLocalChunkOrders[OrderIndex];
+ const IoHash LocalChunkHash = OutLocalChunkHashes[LocalChunkIndex];
+ uint64_t LocalChunkRawSize = OutLocalChunkRawSizes[LocalChunkIndex];
+
+ uint32_t VerifyChunkIndex = AbsoluteChunkOrders[OrderIndex];
+ const IoHash VerifyChunkHash = AbsoluteChunkHashes[VerifyChunkIndex];
+ uint64_t VerifyChunkRawSize = AbsoluteChunkRawSizes[VerifyChunkIndex];
+
+ ZEN_ASSERT(LocalChunkHash == VerifyChunkHash);
+ ZEN_ASSERT(LocalChunkRawSize == VerifyChunkRawSize);
+ }
+#endif // EXTRA_VERIFY
+}
+
+void
+BuildsOperationUploadFolder::WriteBuildContentToCompactBinary(CbObjectWriter& PartManifestWriter,
+ const SourcePlatform Platform,
+ std::span<const std::filesystem::path> Paths,
+ std::span<const IoHash> RawHashes,
+ std::span<const uint64_t> RawSizes,
+ std::span<const uint32_t> Attributes,
+ std::span<const IoHash> SequenceRawHashes,
+ std::span<const uint32_t> ChunkCounts,
+ std::span<const IoHash> LocalChunkHashes,
+ std::span<const uint64_t> LocalChunkRawSizes,
+ const std::vector<uint32_t>& AbsoluteChunkOrders,
+ const std::span<const uint32_t> LooseLocalChunkIndexes,
+ const std::span<IoHash> BlockHashes)
+{
+ ZEN_ASSERT(Platform != SourcePlatform::_Count);
+ PartManifestWriter.AddString("platform"sv, ToString(Platform));
+
+ uint64_t TotalSize = 0;
+ for (const uint64_t Size : RawSizes)
+ {
+ TotalSize += Size;
+ }
+ PartManifestWriter.AddInteger("totalSize", TotalSize);
+
+ PartManifestWriter.BeginObject("files"sv);
+ {
+ compactbinary_helpers::WriteArray(Paths, "paths"sv, PartManifestWriter);
+ compactbinary_helpers::WriteArray(RawHashes, "rawhashes"sv, PartManifestWriter);
+ compactbinary_helpers::WriteArray(RawSizes, "rawsizes"sv, PartManifestWriter);
+ if (Platform == SourcePlatform::Windows)
+ {
+ compactbinary_helpers::WriteArray(Attributes, "attributes"sv, PartManifestWriter);
+ }
+ if (Platform == SourcePlatform::Linux || Platform == SourcePlatform::MacOS)
+ {
+ compactbinary_helpers::WriteArray(Attributes, "mode"sv, PartManifestWriter);
+ }
+ }
+ PartManifestWriter.EndObject(); // files
+
+ PartManifestWriter.BeginObject("chunkedContent");
+ {
+ compactbinary_helpers::WriteArray(SequenceRawHashes, "sequenceRawHashes"sv, PartManifestWriter);
+ compactbinary_helpers::WriteArray(ChunkCounts, "chunkcounts"sv, PartManifestWriter);
+ compactbinary_helpers::WriteArray(AbsoluteChunkOrders, "chunkorders"sv, PartManifestWriter);
+ }
+ PartManifestWriter.EndObject(); // chunkedContent
+
+ size_t LooseChunkCount = LooseLocalChunkIndexes.size();
+ if (LooseChunkCount > 0)
+ {
+ PartManifestWriter.BeginObject("chunkAttachments");
+ {
+ PartManifestWriter.BeginArray("rawHashes"sv);
+ for (uint32_t ChunkIndex : LooseLocalChunkIndexes)
+ {
+ PartManifestWriter.AddBinaryAttachment(LocalChunkHashes[ChunkIndex]);
+ }
+ PartManifestWriter.EndArray(); // rawHashes
+
+ PartManifestWriter.BeginArray("chunkRawSizes"sv);
+ for (uint32_t ChunkIndex : LooseLocalChunkIndexes)
+ {
+ PartManifestWriter.AddInteger(LocalChunkRawSizes[ChunkIndex]);
+ }
+ PartManifestWriter.EndArray(); // chunkSizes
+ }
+ PartManifestWriter.EndObject(); //
+ }
+
+ if (BlockHashes.size() > 0)
+ {
+ PartManifestWriter.BeginObject("blockAttachments");
+ {
+ compactbinary_helpers::WriteBinaryAttachmentArray(BlockHashes, "rawHashes"sv, PartManifestWriter);
+ }
+ PartManifestWriter.EndObject(); // blocks
+ }
+}
+
+CompositeBuffer
+BuildsOperationUploadFolder::FetchChunk(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ const IoHash& ChunkHash,
+ ReadFileCache& OpenFileCache)
+{
+ ZEN_TRACE_CPU("FetchChunk");
+ auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash);
+ ZEN_ASSERT(It != Lookup.ChunkHashToChunkIndex.end());
+ uint32_t ChunkIndex = It->second;
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex);
+ ZEN_ASSERT(!ChunkLocations.empty());
+ CompositeBuffer Chunk =
+ OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, ChunkLocations[0].Offset, Content.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+ ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash);
+ return Chunk;
+};
+
+CompressedBuffer
+BuildsOperationUploadFolder::GenerateBlock(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ const std::vector<uint32_t>& ChunksInBlock,
+ ChunkBlockDescription& OutBlockDescription)
+{
+ ZEN_TRACE_CPU("GenerateBlock");
+ ReadFileCache OpenFileCache(m_DiskStats, m_Path, Content, Lookup, 4);
+
+ std::vector<std::pair<IoHash, FetchChunkFunc>> BlockContent;
+ BlockContent.reserve(ChunksInBlock.size());
+ for (uint32_t ChunkIndex : ChunksInBlock)
+ {
+ BlockContent.emplace_back(std::make_pair(
+ Content.ChunkedContent.ChunkHashes[ChunkIndex],
+ [this, &Content, &Lookup, &OpenFileCache, ChunkIndex](const IoHash& ChunkHash) -> std::pair<uint64_t, CompressedBuffer> {
+ CompositeBuffer Chunk = FetchChunk(Content, Lookup, ChunkHash, OpenFileCache);
+ if (!Chunk)
+ {
+ ZEN_ASSERT(false);
+ }
+ uint64_t RawSize = Chunk.GetSize();
+ const bool ShouldCompressChunk = Lookup.RawHashToSequenceIndex.contains(ChunkHash) &&
+ (RawSize >= m_Options.MinimumSizeForCompressInBlock) &&
+ IsChunkCompressable(Content, Lookup, ChunkIndex);
+ const OodleCompressionLevel CompressionLevel =
+ ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None;
+ return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, CompressionLevel)};
+ }));
+ }
+
+ return GenerateChunkBlock(std::move(BlockContent), OutBlockDescription);
+};
+
+CompressedBuffer
+BuildsOperationUploadFolder::RebuildBlock(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ CompositeBuffer&& HeaderBuffer,
+ const std::vector<uint32_t>& ChunksInBlock)
+{
+ ZEN_TRACE_CPU("RebuildBlock");
+ ReadFileCache OpenFileCache(m_DiskStats, m_Path, Content, Lookup, 4);
+
+ std::vector<SharedBuffer> ResultBuffers;
+ ResultBuffers.reserve(HeaderBuffer.GetSegments().size() + ChunksInBlock.size());
+ ResultBuffers.insert(ResultBuffers.end(), HeaderBuffer.GetSegments().begin(), HeaderBuffer.GetSegments().end());
+ for (uint32_t ChunkIndex : ChunksInBlock)
+ {
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex);
+ ZEN_ASSERT(!ChunkLocations.empty());
+ const IoHash& ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex];
+ CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex,
+ ChunkLocations[0].Offset,
+ Content.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+ ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash);
+
+ const uint64_t RawSize = Chunk.GetSize();
+ const bool ShouldCompressChunk = Lookup.RawHashToSequenceIndex.contains(ChunkHash) &&
+ (RawSize >= m_Options.MinimumSizeForCompressInBlock) &&
+ IsChunkCompressable(Content, Lookup, ChunkIndex);
+ const OodleCompressionLevel CompressionLevel = ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None;
+
+ CompositeBuffer CompressedChunk =
+ CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, CompressionLevel).GetCompressed();
+ ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end());
+ }
+ return CompressedBuffer::FromCompressedNoValidate(CompositeBuffer(std::move(ResultBuffers)));
+};
+
+void
+BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ std::span<IoHash> RawHashes,
+ const std::vector<std::vector<uint32_t>>& NewBlockChunks,
+ GeneratedBlocks& NewBlocks,
+ std::span<const uint32_t> LooseChunkIndexes,
+ const std::uint64_t LargeAttachmentSize,
+ UploadStatistics& TempUploadStats,
+ LooseChunksStatistics& TempLooseChunksStats)
+{
+ ZEN_TRACE_CPU("UploadPartBlobs");
+ {
+ std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Upload Blobs"));
+ BuildOpLogOutput::ProgressBar& Progress(*ProgressBarPtr);
+
+ WorkerThreadPool& ReadChunkPool = m_IOWorkerPool;
+ WorkerThreadPool& UploadChunkPool = m_NetworkPool;
+
+ FilteredRate FilteredGenerateBlockBytesPerSecond;
+ FilteredRate FilteredCompressedBytesPerSecond;
+ FilteredRate FilteredUploadedBytesPerSecond;
+
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ std::atomic<size_t> UploadedBlockSize = 0;
+ std::atomic<size_t> UploadedBlockCount = 0;
+ std::atomic<size_t> UploadedRawChunkSize = 0;
+ std::atomic<size_t> UploadedCompressedChunkSize = 0;
+ std::atomic<uint32_t> UploadedChunkCount = 0;
+
+ tsl::robin_map<uint32_t, uint32_t> ChunkIndexToLooseChunkOrderIndex;
+ ChunkIndexToLooseChunkOrderIndex.reserve(LooseChunkIndexes.size());
+ for (uint32_t OrderIndex = 0; OrderIndex < LooseChunkIndexes.size(); OrderIndex++)
+ {
+ ChunkIndexToLooseChunkOrderIndex.insert_or_assign(LooseChunkIndexes[OrderIndex], OrderIndex);
+ }
+
+ std::vector<size_t> BlockIndexes;
+ std::vector<uint32_t> LooseChunkOrderIndexes;
+
+ uint64_t TotalLooseChunksSize = 0;
+ uint64_t TotalBlocksSize = 0;
+ for (const IoHash& RawHash : RawHashes)
+ {
+ if (auto It = NewBlocks.BlockHashToBlockIndex.find(RawHash); It != NewBlocks.BlockHashToBlockIndex.end())
+ {
+ BlockIndexes.push_back(It->second);
+ TotalBlocksSize += NewBlocks.BlockSizes[It->second];
+ }
+ else if (auto ChunkIndexIt = Lookup.ChunkHashToChunkIndex.find(RawHash); ChunkIndexIt != Lookup.ChunkHashToChunkIndex.end())
+ {
+ const uint32_t ChunkIndex = ChunkIndexIt->second;
+ if (auto LooseOrderIndexIt = ChunkIndexToLooseChunkOrderIndex.find(ChunkIndex);
+ LooseOrderIndexIt != ChunkIndexToLooseChunkOrderIndex.end())
+ {
+ LooseChunkOrderIndexes.push_back(LooseOrderIndexIt->second);
+ TotalLooseChunksSize += Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
+ }
+ }
+ else
+ {
+ ZEN_CONSOLE_WARN(
+ "Build blob {} was reported as needed for upload but it was reported as existing at the start of the "
+ "operation. Treating it as a transient inconsistent issue and will attempt to retry finalization",
+ RawHash);
+ }
+ }
+ uint64_t TotalRawSize = TotalLooseChunksSize + TotalBlocksSize;
+
+ const size_t UploadBlockCount = BlockIndexes.size();
+ const uint32_t UploadChunkCount = gsl::narrow<uint32_t>(LooseChunkOrderIndexes.size());
+
+ auto AsyncUploadBlock = [this,
+ &Work,
+ &NewBlocks,
+ UploadBlockCount,
+ &UploadedBlockCount,
+ UploadChunkCount,
+ &UploadedChunkCount,
+ &UploadedBlockSize,
+ &TempUploadStats,
+ &FilteredUploadedBytesPerSecond,
+ &UploadChunkPool](const size_t BlockIndex,
+ const IoHash BlockHash,
+ CompositeBuffer&& Payload,
+ std::atomic<uint64_t>& QueuedPendingInMemoryBlocksForUpload) {
+ bool IsInMemoryBlock = true;
+ if (QueuedPendingInMemoryBlocksForUpload.load() > 16)
+ {
+ ZEN_TRACE_CPU("AsyncUploadBlock_WriteTempBlock");
+ std::filesystem::path TempFilePath = m_Options.TempDir / (BlockHash.ToHexString());
+ Payload = CompositeBuffer(WriteToTempFile(std::move(Payload), TempFilePath));
+ IsInMemoryBlock = false;
+ }
+ else
+ {
+ QueuedPendingInMemoryBlocksForUpload++;
+ }
+
+ Work.ScheduleWork(
+ UploadChunkPool,
+ [this,
+ &QueuedPendingInMemoryBlocksForUpload,
+ &NewBlocks,
+ UploadBlockCount,
+ &UploadedBlockCount,
+ UploadChunkCount,
+ &UploadedChunkCount,
+ &UploadedBlockSize,
+ &TempUploadStats,
+ &FilteredUploadedBytesPerSecond,
+ IsInMemoryBlock,
+ BlockIndex,
+ BlockHash,
+ Payload = std::move(Payload)](std::atomic<bool>&) mutable {
+ auto _ = MakeGuard([IsInMemoryBlock, &QueuedPendingInMemoryBlocksForUpload] {
+ if (IsInMemoryBlock)
+ {
+ QueuedPendingInMemoryBlocksForUpload--;
+ }
+ });
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("AsyncUploadBlock");
+
+ const uint64_t PayloadSize = Payload.GetSize();
+
+ FilteredUploadedBytesPerSecond.Start();
+ const CbObject BlockMetaData =
+ BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]);
+
+ if (m_Storage.BuildCacheStorage)
+ {
+ m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload);
+ }
+ m_Storage.BuildStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload);
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Uploaded block {} ({}) containing {} chunks",
+ BlockHash,
+ NiceBytes(PayloadSize),
+ NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
+ }
+ UploadedBlockSize += PayloadSize;
+ TempUploadStats.BlocksBytes += PayloadSize;
+
+ if (m_Storage.BuildCacheStorage)
+ {
+ m_Storage.BuildCacheStorage->PutBlobMetadatas(m_BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
+ }
+ bool MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData);
+ if (MetadataSucceeded)
+ {
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput, "Uploaded block {} metadata ({})", BlockHash, NiceBytes(BlockMetaData.GetSize()));
+ }
+
+ NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
+ TempUploadStats.BlocksBytes += BlockMetaData.GetSize();
+ }
+
+ TempUploadStats.BlockCount++;
+
+ UploadedBlockCount++;
+ if (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount)
+ {
+ FilteredUploadedBytesPerSecond.Stop();
+ }
+ }
+ });
+ };
+
+ auto AsyncUploadLooseChunk = [this,
+ LargeAttachmentSize,
+ &Work,
+ &UploadChunkPool,
+ &FilteredUploadedBytesPerSecond,
+ &UploadedBlockCount,
+ &UploadedChunkCount,
+ UploadBlockCount,
+ UploadChunkCount,
+ &UploadedCompressedChunkSize,
+ &UploadedRawChunkSize,
+ &TempUploadStats](const IoHash& RawHash, uint64_t RawSize, CompositeBuffer&& Payload) {
+ Work.ScheduleWork(
+ UploadChunkPool,
+ [this,
+ &Work,
+ LargeAttachmentSize,
+ &FilteredUploadedBytesPerSecond,
+ &UploadChunkPool,
+ &UploadedBlockCount,
+ &UploadedChunkCount,
+ UploadBlockCount,
+ UploadChunkCount,
+ &UploadedCompressedChunkSize,
+ &UploadedRawChunkSize,
+ &TempUploadStats,
+ RawHash,
+ RawSize,
+ Payload = CompositeBuffer(std::move(Payload))](std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("AsyncUploadLooseChunk");
+
+ const uint64_t PayloadSize = Payload.GetSize();
+
+ if (m_Storage.BuildCacheStorage)
+ {
+ m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
+ }
+
+ if (PayloadSize >= LargeAttachmentSize)
+ {
+ ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart");
+ TempUploadStats.MultipartAttachmentCount++;
+ std::vector<std::function<void()>> MultipartWork = m_Storage.BuildStorage->PutLargeBuildBlob(
+ m_BuildId,
+ RawHash,
+ ZenContentType::kCompressedBinary,
+ PayloadSize,
+ [Payload = std::move(Payload), &FilteredUploadedBytesPerSecond](uint64_t Offset,
+ uint64_t Size) mutable -> IoBuffer {
+ FilteredUploadedBytesPerSecond.Start();
+
+ IoBuffer PartPayload = Payload.Mid(Offset, Size).Flatten().AsIoBuffer();
+ PartPayload.SetContentType(ZenContentType::kBinary);
+ return PartPayload;
+ },
+ [RawSize,
+ &TempUploadStats,
+ &UploadedCompressedChunkSize,
+ &UploadChunkPool,
+ &UploadedBlockCount,
+ UploadBlockCount,
+ &UploadedChunkCount,
+ UploadChunkCount,
+ &FilteredUploadedBytesPerSecond,
+ &UploadedRawChunkSize](uint64_t SentBytes, bool IsComplete) {
+ TempUploadStats.ChunksBytes += SentBytes;
+ UploadedCompressedChunkSize += SentBytes;
+ if (IsComplete)
+ {
+ TempUploadStats.ChunkCount++;
+ UploadedChunkCount++;
+ if (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount)
+ {
+ FilteredUploadedBytesPerSecond.Stop();
+ }
+ UploadedRawChunkSize += RawSize;
+ }
+ });
+ for (auto& WorkPart : MultipartWork)
+ {
+ Work.ScheduleWork(UploadChunkPool, [Work = std::move(WorkPart)](std::atomic<bool>& AbortFlag) {
+ ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart_Work");
+ if (!AbortFlag)
+ {
+ Work();
+ }
+ });
+ }
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput, "Uploaded multipart chunk {} ({})", RawHash, NiceBytes(PayloadSize));
+ }
+ }
+ else
+ {
+ ZEN_TRACE_CPU("AsyncUploadLooseChunk_Singlepart");
+ m_Storage.BuildStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput, "Uploaded chunk {} ({})", RawHash, NiceBytes(PayloadSize));
+ }
+ TempUploadStats.ChunksBytes += Payload.GetSize();
+ TempUploadStats.ChunkCount++;
+ UploadedCompressedChunkSize += Payload.GetSize();
+ UploadedRawChunkSize += RawSize;
+ UploadedChunkCount++;
+ if (UploadedChunkCount == UploadChunkCount)
+ {
+ FilteredUploadedBytesPerSecond.Stop();
+ }
+ }
+ }
+ });
+ };
+
+ std::vector<size_t> GenerateBlockIndexes;
+
+ std::atomic<uint64_t> GeneratedBlockCount = 0;
+ std::atomic<uint64_t> GeneratedBlockByteCount = 0;
+
+ std::atomic<uint64_t> QueuedPendingInMemoryBlocksForUpload = 0;
+
+ // Start generation of any non-prebuilt blocks and schedule upload
+ for (const size_t BlockIndex : BlockIndexes)
+ {
+ const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash;
+ if (!m_AbortFlag)
+ {
+ Work.ScheduleWork(
+ ReadChunkPool,
+ [this,
+ BlockHash = IoHash(BlockHash),
+ BlockIndex,
+ &FilteredGenerateBlockBytesPerSecond,
+ &Content,
+ &Lookup,
+ &NewBlocks,
+ &NewBlockChunks,
+ &GenerateBlockIndexes,
+ &GeneratedBlockCount,
+ &GeneratedBlockByteCount,
+ &AsyncUploadBlock,
+ &QueuedPendingInMemoryBlocksForUpload](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("UploadPartBlobs_GenerateBlock");
+
+ FilteredGenerateBlockBytesPerSecond.Start();
+
+ Stopwatch GenerateTimer;
+ CompositeBuffer Payload;
+ if (NewBlocks.BlockHeaders[BlockIndex])
+ {
+ Payload =
+ RebuildBlock(Content, Lookup, std::move(NewBlocks.BlockHeaders[BlockIndex]), NewBlockChunks[BlockIndex])
+ .GetCompressed();
+ }
+ else
+ {
+ ChunkBlockDescription BlockDescription;
+ CompressedBuffer CompressedBlock =
+ GenerateBlock(Content, Lookup, NewBlockChunks[BlockIndex], BlockDescription);
+ if (!CompressedBlock)
+ {
+ throw std::runtime_error(fmt::format("Failed generating block {}", BlockHash));
+ }
+ ZEN_ASSERT(BlockDescription.BlockHash == BlockHash);
+ Payload = std::move(CompressedBlock).GetCompressed();
+ }
+
+ GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex];
+ GeneratedBlockCount++;
+ if (GeneratedBlockCount == GenerateBlockIndexes.size())
+ {
+ FilteredGenerateBlockBytesPerSecond.Stop();
+ }
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "{} block {} ({}) containing {} chunks in {}",
+ NewBlocks.BlockHeaders[BlockIndex] ? "Regenerated" : "Generated",
+ NewBlocks.BlockDescriptions[BlockIndex].BlockHash,
+ NiceBytes(NewBlocks.BlockSizes[BlockIndex]),
+ NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(),
+ NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs()));
+ }
+ if (!m_AbortFlag)
+ {
+ AsyncUploadBlock(BlockIndex, BlockHash, std::move(Payload), QueuedPendingInMemoryBlocksForUpload);
+ }
+ }
+ });
+ }
+ }
+
+ // Start compression of any non-precompressed loose chunks and schedule upload
+ for (const uint32_t LooseChunkOrderIndex : LooseChunkOrderIndexes)
+ {
+ const uint32_t ChunkIndex = LooseChunkIndexes[LooseChunkOrderIndex];
+ Work.ScheduleWork(
+ ReadChunkPool,
+ [this,
+ &Content,
+ &Lookup,
+ &TempLooseChunksStats,
+ &LooseChunkOrderIndexes,
+ &FilteredCompressedBytesPerSecond,
+ &TempUploadStats,
+ &AsyncUploadLooseChunk,
+ ChunkIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk");
+
+ FilteredCompressedBytesPerSecond.Start();
+ Stopwatch CompressTimer;
+ CompositeBuffer Payload = CompressChunk(Content, Lookup, ChunkIndex, TempLooseChunksStats);
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Compressed chunk {} ({} -> {}) in {}",
+ Content.ChunkedContent.ChunkHashes[ChunkIndex],
+ NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]),
+ NiceBytes(Payload.GetSize()),
+ NiceTimeSpanMs(CompressTimer.GetElapsedTimeMs()));
+ }
+ const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
+ TempUploadStats.ReadFromDiskBytes += ChunkRawSize;
+ if (TempLooseChunksStats.CompressedChunkCount == LooseChunkOrderIndexes.size())
+ {
+ FilteredCompressedBytesPerSecond.Stop();
+ }
+ if (!m_AbortFlag)
+ {
+ AsyncUploadLooseChunk(Content.ChunkedContent.ChunkHashes[ChunkIndex], ChunkRawSize, std::move(Payload));
+ }
+ }
+ });
+ }
+
+ Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
+ FilteredCompressedBytesPerSecond.Update(TempLooseChunksStats.CompressedChunkRawBytes.load());
+ FilteredGenerateBlockBytesPerSecond.Update(GeneratedBlockByteCount.load());
+ FilteredUploadedBytesPerSecond.Update(UploadedCompressedChunkSize.load() + UploadedBlockSize.load());
+ uint64_t UploadedRawSize = UploadedRawChunkSize.load() + UploadedBlockSize.load();
+ uint64_t UploadedCompressedSize = UploadedCompressedChunkSize.load() + UploadedBlockSize.load();
+
+ std::string Details = fmt::format(
+ "Compressed {}/{} ({}/{} {}B/s) chunks. "
+ "Uploaded {}/{} ({}/{}) blobs "
+ "({} {}bits/s)",
+ TempLooseChunksStats.CompressedChunkCount.load(),
+ LooseChunkOrderIndexes.size(),
+ NiceBytes(TempLooseChunksStats.CompressedChunkRawBytes),
+ NiceBytes(TotalLooseChunksSize),
+ NiceNum(FilteredCompressedBytesPerSecond.GetCurrent()),
+
+ UploadedBlockCount.load() + UploadedChunkCount.load(),
+ UploadBlockCount + UploadChunkCount,
+ NiceBytes(UploadedRawSize),
+ NiceBytes(TotalRawSize),
+
+ NiceBytes(UploadedCompressedSize),
+ NiceNum(FilteredUploadedBytesPerSecond.GetCurrent()));
+
+ Progress.UpdateState({.Task = "Uploading blobs ",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(TotalRawSize),
+ .RemainingCount = gsl::narrow<uint64_t>(TotalRawSize - UploadedRawSize),
+ .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+
+ ZEN_ASSERT(m_AbortFlag || QueuedPendingInMemoryBlocksForUpload.load() == 0);
+
+ Progress.Finish();
+
+ TempUploadStats.ElapsedWallTimeUS += FilteredUploadedBytesPerSecond.GetElapsedTimeUS();
+ TempLooseChunksStats.CompressChunksElapsedWallTimeUS += FilteredCompressedBytesPerSecond.GetElapsedTimeUS();
+ }
+}
+
+CompositeBuffer
+BuildsOperationUploadFolder::CompressChunk(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ uint32_t ChunkIndex,
+ LooseChunksStatistics& TempLooseChunksStats)
+{
+ ZEN_TRACE_CPU("CompressChunk");
+ ZEN_ASSERT(!m_Options.TempDir.empty());
+ const IoHash& ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex];
+ const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
+
+ const ChunkedContentLookup::ChunkSequenceLocation& Source = GetChunkSequenceLocations(Lookup, ChunkIndex)[0];
+ const std::uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[Source.SequenceIndex];
+ IoBuffer RawSource = IoBufferBuilder::MakeFromFile((m_Path / Content.Paths[PathIndex]).make_preferred(), Source.Offset, ChunkSize);
+ if (!RawSource)
+ {
+ throw std::runtime_error(fmt::format("Failed fetching chunk {}", ChunkHash));
+ }
+ if (RawSource.GetSize() != ChunkSize)
+ {
+ throw std::runtime_error(fmt::format("Fetched chunk {} has invalid size", ChunkHash));
+ }
+
+ const bool ShouldCompressChunk = IsChunkCompressable(Content, Lookup, ChunkIndex);
+ const OodleCompressionLevel CompressionLevel = ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None;
+
+ if (ShouldCompressChunk)
+ {
+ std::filesystem::path TempFilePath = m_Options.TempDir / ChunkHash.ToHexString();
+
+ BasicFile CompressedFile;
+ std::error_code Ec;
+ CompressedFile.Open(TempFilePath, BasicFile::Mode::kTruncateDelete, Ec);
+ if (Ec)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed creating temporary file for compressing blob {}. Reason: {}", ChunkHash, Ec.message()));
+ }
+
+ uint64_t StreamRawBytes = 0;
+ uint64_t StreamCompressedBytes = 0;
+
+ bool CouldCompress = CompressedBuffer::CompressToStream(
+ CompositeBuffer(SharedBuffer(RawSource)),
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset);
+ TempLooseChunksStats.CompressedChunkRawBytes += SourceSize;
+ CompressedFile.Write(RangeBuffer, Offset);
+ TempLooseChunksStats.CompressedChunkBytes += RangeBuffer.GetSize();
+ StreamRawBytes += SourceSize;
+ StreamCompressedBytes += RangeBuffer.GetSize();
+ },
+ OodleCompressor::Mermaid,
+ CompressionLevel);
+ if (CouldCompress)
+ {
+ uint64_t CompressedSize = CompressedFile.FileSize();
+ void* FileHandle = CompressedFile.Detach();
+ IoBuffer TempPayload = IoBuffer(IoBuffer::File,
+ FileHandle,
+ 0,
+ CompressedSize,
+ /*IsWholeFile*/ true);
+ ZEN_ASSERT(TempPayload);
+ TempPayload.SetDeleteOnClose(true);
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(TempPayload), RawHash, RawSize);
+ ZEN_ASSERT(Compressed);
+ ZEN_ASSERT(RawHash == ChunkHash);
+ ZEN_ASSERT(RawSize == ChunkSize);
+
+ TempLooseChunksStats.CompressedChunkCount++;
+
+ return Compressed.GetCompressed();
+ }
+ else
+ {
+ TempLooseChunksStats.CompressedChunkRawBytes -= StreamRawBytes;
+ TempLooseChunksStats.CompressedChunkBytes -= StreamCompressedBytes;
+ }
+ CompressedFile.Close();
+ RemoveFile(TempFilePath, Ec);
+ ZEN_UNUSED(Ec);
+ }
+
+ CompressedBuffer CompressedBlob =
+ CompressedBuffer::Compress(SharedBuffer(std::move(RawSource)), OodleCompressor::Mermaid, CompressionLevel);
+ if (!CompressedBlob)
+ {
+ throw std::runtime_error(fmt::format("Failed to compress large blob {}", ChunkHash));
+ }
+ ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawHash() == ChunkHash);
+ ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawSize() == ChunkSize);
+
+ TempLooseChunksStats.CompressedChunkRawBytes += ChunkSize;
+ TempLooseChunksStats.CompressedChunkBytes += CompressedBlob.GetCompressedSize();
+
+ // If we use none-compression, the compressed blob references the data and has 64 kb in memory so we don't need to write it to disk
+ if (ShouldCompressChunk)
+ {
+ std::filesystem::path TempFilePath = m_Options.TempDir / (ChunkHash.ToHexString());
+ IoBuffer TempPayload = WriteToTempFile(std::move(CompressedBlob).GetCompressed(), TempFilePath);
+ CompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload));
+ }
+
+ TempLooseChunksStats.CompressedChunkCount++;
+ return std::move(CompressedBlob).GetCompressed();
+}
+
} // namespace zen
diff --git a/src/zenremotestore/builds/filebuildstorage.cpp b/src/zenremotestore/builds/filebuildstorage.cpp
index 5cfd80666..3cda5f00f 100644
--- a/src/zenremotestore/builds/filebuildstorage.cpp
+++ b/src/zenremotestore/builds/filebuildstorage.cpp
@@ -187,7 +187,11 @@ public:
CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView();
CbObjectView MetaDataView = BuildObject["metadata"sv].AsObjectView();
- Writer.AddObject("metadata"sv, MetaDataView);
+ for (CbFieldView InnerField : MetaDataView)
+ {
+ Writer.AddField(InnerField.GetName(), InnerField);
+ }
+
Writer.BeginObject("parts"sv);
{
for (CbFieldView PartView : PartsObject)
diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
index 0e719edc6..8ba32127a 100644
--- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
+++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
@@ -22,6 +22,7 @@ class HttpClient;
class ParallelWork;
class WorkerThreadPool;
class FilteredRate;
+class ReadFileCache;
class BufferedWriteFileCache;
struct ChunkBlockDescription;
@@ -192,8 +193,8 @@ public:
bool PrimeCacheOnly = false;
bool EnableOtherDownloadsScavenging = true;
bool EnableTargetFolderScavenging = true;
- std::vector<std::string> DefaultExcludeFolders;
- std::vector<std::string> DefaultExcludeExtensions;
+ std::vector<std::string> ExcludeFolders;
+ std::vector<std::string> ExcludeExtensions;
};
BuildsOperationUpdateFolder(BuildOpLogOutput& LogOutput,
@@ -210,11 +211,11 @@ public:
const ChunkedContentLookup& RemoteLookup,
const std::vector<ChunkBlockDescription>& BlockDescriptions,
const std::vector<IoHash>& LooseChunkHashes,
- const Options& Options,
- DiskStatistics& DiskStats);
+ const Options& Options);
void Execute(FolderContent& OutLocalFolderState);
+ DiskStatistics m_DiskStats;
CacheMappingStatistics m_CacheMappingStats;
GetFolderContentStatistics m_ScavengedFolderScanStats;
DownloadStatistics m_DownloadStats;
@@ -372,10 +373,244 @@ private:
const std::vector<ChunkBlockDescription>& m_BlockDescriptions;
const std::vector<IoHash>& m_LooseChunkHashes;
const Options m_Options;
- DiskStatistics& m_DiskStats;
const std::filesystem::path m_CacheFolderPath;
};
+struct FindBlocksStatistics
+{
+ uint64_t FindBlockTimeMS = 0;
+ uint64_t PotentialChunkCount = 0;
+ uint64_t PotentialChunkByteCount = 0;
+ uint64_t FoundBlockCount = 0;
+ uint64_t FoundBlockChunkCount = 0;
+ uint64_t FoundBlockByteCount = 0;
+ uint64_t AcceptedBlockCount = 0;
+ uint64_t AcceptedChunkCount = 0;
+ uint64_t AcceptedByteCount = 0;
+ uint64_t AcceptedRawByteCount = 0;
+ uint64_t RejectedBlockCount = 0;
+ uint64_t RejectedChunkCount = 0;
+ uint64_t RejectedByteCount = 0;
+ uint64_t AcceptedReduntantChunkCount = 0;
+ uint64_t AcceptedReduntantByteCount = 0;
+ uint64_t NewBlocksCount = 0;
+ uint64_t NewBlocksChunkCount = 0;
+ uint64_t NewBlocksChunkByteCount = 0;
+};
+
+struct UploadStatistics
+{
+ std::atomic<uint64_t> BlockCount = 0;
+ std::atomic<uint64_t> BlocksBytes = 0;
+ std::atomic<uint64_t> ChunkCount = 0;
+ std::atomic<uint64_t> ChunksBytes = 0;
+ std::atomic<uint64_t> ReadFromDiskBytes = 0;
+ std::atomic<uint64_t> MultipartAttachmentCount = 0;
+ uint64_t ElapsedWallTimeUS = 0;
+
+ UploadStatistics& operator+=(const UploadStatistics& Rhs)
+ {
+ BlockCount += Rhs.BlockCount;
+ BlocksBytes += Rhs.BlocksBytes;
+ ChunkCount += Rhs.ChunkCount;
+ ChunksBytes += Rhs.ChunksBytes;
+ ReadFromDiskBytes += Rhs.ReadFromDiskBytes;
+ MultipartAttachmentCount += Rhs.MultipartAttachmentCount;
+ ElapsedWallTimeUS += Rhs.ElapsedWallTimeUS;
+ return *this;
+ }
+};
+
+struct LooseChunksStatistics
+{
+ uint64_t ChunkCount = 0;
+ uint64_t ChunkByteCount = 0;
+ std::atomic<uint64_t> CompressedChunkCount = 0;
+ std::atomic<uint64_t> CompressedChunkRawBytes = 0;
+ std::atomic<uint64_t> CompressedChunkBytes = 0;
+ uint64_t CompressChunksElapsedWallTimeUS = 0;
+
+ LooseChunksStatistics& operator+=(const LooseChunksStatistics& Rhs)
+ {
+ ChunkCount += Rhs.ChunkCount;
+ ChunkByteCount += Rhs.ChunkByteCount;
+ CompressedChunkCount += Rhs.CompressedChunkCount;
+ CompressedChunkRawBytes += Rhs.CompressedChunkRawBytes;
+ CompressedChunkBytes += Rhs.CompressedChunkBytes;
+ CompressChunksElapsedWallTimeUS += Rhs.CompressChunksElapsedWallTimeUS;
+ return *this;
+ }
+};
+
+struct GenerateBlocksStatistics
+{
+ std::atomic<uint64_t> GeneratedBlockByteCount = 0;
+ std::atomic<uint64_t> GeneratedBlockCount = 0;
+ uint64_t GenerateBlocksElapsedWallTimeUS = 0;
+
+ GenerateBlocksStatistics& operator+=(const GenerateBlocksStatistics& Rhs)
+ {
+ GeneratedBlockByteCount += Rhs.GeneratedBlockByteCount;
+ GeneratedBlockCount += Rhs.GeneratedBlockCount;
+ GenerateBlocksElapsedWallTimeUS += Rhs.GenerateBlocksElapsedWallTimeUS;
+ return *this;
+ }
+};
+
+class BuildsOperationUploadFolder
+{
+public:
+ struct ChunksBlockParameters
+ {
+ size_t MaxBlockSize = 64u * 1024u * 1024u;
+ size_t MaxChunksPerBlock = 4u * 1000u;
+ size_t MaxChunkEmbedSize = 3u * 512u * 1024u;
+ };
+
+ struct Options
+ {
+ bool IsQuiet = false;
+ bool IsVerbose = false;
+
+ const uint64_t FindBlockMaxCount = 10000;
+ const uint8_t BlockReuseMinPercentLimit = 85;
+ bool AllowMultiparts = true;
+ bool IgnoreExistingBlocks = false;
+ ChunksBlockParameters BlockParameters;
+
+ uint32_t PreferredMultipartChunkSize = 32u * 1024u * 1024u;
+
+ const uint64_t MinimumSizeForCompressInBlock = 2u * 1024u;
+
+ std::filesystem::path TempDir;
+ std::vector<std::string> ExcludeFolders;
+ std::vector<std::string> ExcludeExtensions;
+ std::string ZenExcludeManifestName = ".zen_exclude_manifest.txt";
+ };
+ BuildsOperationUploadFolder(BuildOpLogOutput& LogOutput,
+ StorageInstance& Storage,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ WorkerThreadPool& IOWorkerPool,
+ WorkerThreadPool& NetworkPool,
+ const Oid& BuildId,
+ const Oid& BuildPartId,
+ const std::string_view BuildPartName,
+ const std::filesystem::path& Path,
+ const std::filesystem::path& ManifestPath,
+ bool CreateBuild,
+ const CbObject& MetaData,
+ const Options& Options);
+
+ void Execute();
+
+ DiskStatistics m_DiskStats;
+ GetFolderContentStatistics m_LocalFolderScanStats;
+ ChunkingStatistics m_ChunkingStats;
+ FindBlocksStatistics m_FindBlocksStats;
+ UploadStatistics m_UploadStats;
+ GenerateBlocksStatistics m_GenerateBlocksStats;
+ LooseChunksStatistics m_LooseChunksStats;
+
+private:
+ std::vector<size_t> FindReuseBlocks(const std::vector<ChunkBlockDescription>& KnownBlocks,
+ std::span<const IoHash> ChunkHashes,
+ std::span<const uint32_t> ChunkIndexes,
+ std::vector<uint32_t>& OutUnusedChunkIndexes);
+ void ArrangeChunksIntoBlocks(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ std::vector<uint32_t>& ChunkIndexes,
+ std::vector<std::vector<uint32_t>>& OutBlocks);
+ struct GeneratedBlocks
+ {
+ std::vector<ChunkBlockDescription> BlockDescriptions;
+ std::vector<uint64_t> BlockSizes;
+ std::vector<CompositeBuffer> BlockHeaders;
+ std::vector<CbObject> BlockMetaDatas;
+ std::vector<uint8_t>
+ MetaDataHasBeenUploaded; // NOTE: Do not use std::vector<bool> here as this vector is modified by multiple threads
+ tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockHashToBlockIndex;
+ };
+
+ void GenerateBuildBlocks(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ const std::vector<std::vector<uint32_t>>& NewBlockChunks,
+ GeneratedBlocks& OutBlocks);
+
+ std::vector<uint32_t> CalculateAbsoluteChunkOrders(const std::span<const IoHash> LocalChunkHashes,
+ const std::span<const uint32_t> LocalChunkOrder,
+ const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& ChunkHashToLocalChunkIndex,
+ const std::span<const uint32_t>& LooseChunkIndexes,
+ const std::span<const ChunkBlockDescription>& BlockDescriptions);
+ void CalculateLocalChunkOrders(const std::span<const uint32_t>& AbsoluteChunkOrders,
+ const std::span<const IoHash> LooseChunkHashes,
+ const std::span<const uint64_t> LooseChunkRawSizes,
+ const std::span<const ChunkBlockDescription>& BlockDescriptions,
+ std::vector<IoHash>& OutLocalChunkHashes,
+ std::vector<uint64_t>& OutLocalChunkRawSizes,
+ std::vector<uint32_t>& OutLocalChunkOrders);
+
+ void WriteBuildContentToCompactBinary(CbObjectWriter& PartManifestWriter,
+ const SourcePlatform Platform,
+ std::span<const std::filesystem::path> Paths,
+ std::span<const IoHash> RawHashes,
+ std::span<const uint64_t> RawSizes,
+ std::span<const uint32_t> Attributes,
+ std::span<const IoHash> SequenceRawHashes,
+ std::span<const uint32_t> ChunkCounts,
+ std::span<const IoHash> LocalChunkHashes,
+ std::span<const uint64_t> LocalChunkRawSizes,
+ const std::vector<uint32_t>& AbsoluteChunkOrders,
+ const std::span<const uint32_t> LooseLocalChunkIndexes,
+ const std::span<IoHash> BlockHashes);
+
+ CompositeBuffer FetchChunk(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ const IoHash& ChunkHash,
+ ReadFileCache& OpenFileCache);
+
+ CompressedBuffer GenerateBlock(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ const std::vector<uint32_t>& ChunksInBlock,
+ ChunkBlockDescription& OutBlockDescription);
+
+ CompressedBuffer RebuildBlock(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ CompositeBuffer&& HeaderBuffer,
+ const std::vector<uint32_t>& ChunksInBlock);
+
+ void UploadPartBlobs(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ std::span<IoHash> RawHashes,
+ const std::vector<std::vector<uint32_t>>& NewBlockChunks,
+ GeneratedBlocks& NewBlocks,
+ std::span<const uint32_t> LooseChunkIndexes,
+ const std::uint64_t LargeAttachmentSize,
+ UploadStatistics& TempUploadStats,
+ LooseChunksStatistics& TempLooseChunksStats);
+
+ CompositeBuffer CompressChunk(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ uint32_t ChunkIndex,
+ LooseChunksStatistics& TempLooseChunksStats);
+
+ BuildOpLogOutput& m_LogOutput;
+ StorageInstance& m_Storage;
+ std::atomic<bool>& m_AbortFlag;
+ std::atomic<bool>& m_PauseFlag;
+ WorkerThreadPool& m_IOWorkerPool;
+ WorkerThreadPool& m_NetworkPool;
+ const Oid m_BuildId;
+ const Oid m_BuildPartId;
+ const std::string m_BuildPartName;
+
+ const std::filesystem::path m_Path;
+ const std::filesystem::path m_ManifestPath;
+ const bool m_CreateBuild; // ?? Member?
+ const CbObject m_MetaData; // ?? Member
+ const Options m_Options;
+};
+
void DownloadLargeBlob(BuildStorage& Storage,
const std::filesystem::path& DownloadFolder,
const Oid& BuildId,
diff --git a/src/zenutil/commandlineoptions.cpp b/src/zenutil/commandlineoptions.cpp
index 040726c77..5db6d8c04 100644
--- a/src/zenutil/commandlineoptions.cpp
+++ b/src/zenutil/commandlineoptions.cpp
@@ -132,33 +132,6 @@ StripCommandlineQuotes(std::vector<std::string>& InOutArgs)
return RawArgs;
}
-void
-MakeSafeAbsolutePathÍnPlace(std::filesystem::path& Path)
-{
- if (!Path.empty())
- {
- std::filesystem::path AbsolutePath = std::filesystem::absolute(Path).make_preferred();
-#if ZEN_PLATFORM_WINDOWS
- const std::string_view Prefix = "\\\\?\\";
- const std::u8string PrefixU8(Prefix.begin(), Prefix.end());
- std::u8string PathString = AbsolutePath.u8string();
- if (!PathString.empty() && !PathString.starts_with(PrefixU8))
- {
- PathString.insert(0, PrefixU8);
- Path = PathString;
- }
-#endif // ZEN_PLATFORM_WINDOWS
- }
-}
-
-std::filesystem::path
-MakeSafeAbsolutePath(const std::filesystem::path& Path)
-{
- std::filesystem::path Tmp(Path);
- MakeSafeAbsolutePathÍnPlace(Tmp);
- return Tmp;
-}
-
std::filesystem::path
StringToPath(const std::string_view& Path)
{
diff --git a/src/zenutil/include/zenutil/commandlineoptions.h b/src/zenutil/include/zenutil/commandlineoptions.h
index f927d41e5..d6a171242 100644
--- a/src/zenutil/include/zenutil/commandlineoptions.h
+++ b/src/zenutil/include/zenutil/commandlineoptions.h
@@ -17,12 +17,10 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
-std::vector<std::string> ParseCommandLine(std::string_view CommandLine);
-std::vector<char*> StripCommandlineQuotes(std::vector<std::string>& InOutArgs);
-void MakeSafeAbsolutePathÍnPlace(std::filesystem::path& Path);
-[[nodiscard]] std::filesystem::path MakeSafeAbsolutePath(const std::filesystem::path& Path);
-std::filesystem::path StringToPath(const std::string_view& Path);
-std::string_view RemoveQuotes(const std::string_view& Arg);
+std::vector<std::string> ParseCommandLine(std::string_view CommandLine);
+std::vector<char*> StripCommandlineQuotes(std::vector<std::string>& InOutArgs);
+std::filesystem::path StringToPath(const std::string_view& Path);
+std::string_view RemoveQuotes(const std::string_view& Arg);
void commandlineoptions_forcelink(); // internal