aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-03-03 17:53:11 +0100
committerGitHub Enterprise <[email protected]>2025-03-03 17:53:11 +0100
commit1270bfeffbc81b1e4940c5c454ee6acde43e696a (patch)
tree9ff53df6b43f2806fb5701b4d10ad37696a1c203 /src
parentbuilds download incremental (#290) (diff)
downloadzen-1270bfeffbc81b1e4940c5c454ee6acde43e696a.tar.xz
zen-1270bfeffbc81b1e4940c5c454ee6acde43e696a.zip
refactor use chunk sequence download (#291)
* work on chunk sequences on download, not paths * write chunksequences to .tmp file and move when complete * cleanup * Added on the fly validation `zen builds download` of files built from smaller chunks as each file is completed Added `--verify` option to `zen builds upload` to verify all uploaded data once entire upload is complete Added `--verify` option to `zen builds download` to verify all files in target folder once entire download is complete Fixed/improved progress updated Multithreaded part validation * added rates to Write Chunks task * b/s -> bits/s * dont validate partial content as complete payload * handle legacy c# builds
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.cpp1324
-rw-r--r--src/zen/cmds/builds_cmd.h2
-rw-r--r--src/zenhttp/httpclient.cpp5
-rw-r--r--src/zenutil/chunkedcontent.cpp78
-rw-r--r--src/zenutil/include/zenutil/chunkedcontent.h53
5 files changed, 910 insertions, 552 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index 28c794559..219d01240 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -286,7 +286,7 @@ namespace {
}
}
- uint64_t GetCurrent() const
+ uint64_t GetCurrent() const // If Stopped - return total count / total time
{
if (LastTimeUS == (uint64_t)-1)
{
@@ -330,6 +330,16 @@ namespace {
return Count * 1000000 / ElapsedWallTimeUS;
}
+ std::filesystem::path GetTempChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash)
+ {
+ return (CacheFolderPath / (RawHash.ToHexString() + ".tmp")).make_preferred();
+ }
+
+ std::filesystem::path GetFinalChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash)
+ {
+ return (CacheFolderPath / RawHash.ToHexString()).make_preferred();
+ }
+
ChunkedFolderContent ScanAndChunkFolder(
GetFolderContentStatistics& GetFolderContentStats,
ChunkingStatistics& ChunkingStats,
@@ -364,15 +374,16 @@ namespace {
UsePlainProgress ? 5000 : 200,
[&](bool, std::ptrdiff_t) {
FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load());
+ std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found",
+ ChunkingStats.FilesProcessed.load(),
+ GetFolderContentStats.AcceptedFileCount.load(),
+ NiceBytes(ChunkingStats.BytesHashed.load()),
+ NiceBytes(GetFolderContentStats.FoundFileByteCount),
+ NiceNum(FilteredBytesHashed.GetCurrent()),
+ ChunkingStats.UniqueChunksFound.load(),
+ NiceBytes(ChunkingStats.UniqueBytesFound.load()));
ProgressBar.UpdateState({.Task = "Scanning files ",
- .Details = fmt::format("{}/{} ({}/{}, {}B/s) files, {} ({}) chunks found",
- ChunkingStats.FilesProcessed.load(),
- GetFolderContentStats.AcceptedFileCount.load(),
- NiceBytes(ChunkingStats.BytesHashed.load()),
- NiceBytes(GetFolderContentStats.FoundFileByteCount),
- NiceNum(FilteredBytesHashed.GetCurrent()),
- ChunkingStats.UniqueChunksFound.load(),
- NiceBytes(ChunkingStats.UniqueBytesFound.load())),
+ .Details = Details,
.TotalCount = GetFolderContentStats.AcceptedFileByteCount,
.RemainingCount = GetFolderContentStats.AcceptedFileByteCount - ChunkingStats.BytesHashed.load()},
false);
@@ -766,9 +777,19 @@ namespace {
}
}
- if (FilesObject["chunkcounts"sv])
+ if (CbObjectView ChunkContentView = BuildPartManifest["chunkedContent"sv].AsObjectView(); ChunkContentView)
+ {
+ compactbinary_helpers::ReadArray("sequenceRawHashes"sv, ChunkContentView, OutSequenceRawHashes);
+ compactbinary_helpers::ReadArray("chunkcounts"sv, ChunkContentView, OutChunkCounts);
+ if (OutChunkCounts.size() != OutSequenceRawHashes.size())
+ {
+ throw std::runtime_error(fmt::format("Number of chunk count entries does not match number of paths"));
+ }
+ compactbinary_helpers::ReadArray("chunkorders"sv, ChunkContentView, OutAbsoluteChunkOrders);
+ }
+ else if (FilesObject["chunkcounts"sv])
{
- // Legacy style
+ // Legacy zen style
std::vector<uint32_t> LegacyChunkCounts;
compactbinary_helpers::ReadArray("chunkcounts"sv, FilesObject, LegacyChunkCounts);
@@ -805,15 +826,29 @@ namespace {
OrderIndexOffset += LegacyChunkCounts[PathIndex];
}
}
- if (CbObjectView ChunkContentView = BuildPartManifest["chunkedContent"sv].AsObjectView(); ChunkContentView)
+ else
{
- compactbinary_helpers::ReadArray("sequenceRawHashes"sv, ChunkContentView, OutSequenceRawHashes);
- compactbinary_helpers::ReadArray("chunkcounts"sv, ChunkContentView, OutChunkCounts);
- if (OutChunkCounts.size() != OutSequenceRawHashes.size())
+ // Legacy C# style
+
+ tsl::robin_set<IoHash, IoHash::Hasher> FoundRawHashes;
+ FoundRawHashes.reserve(PathCount);
+ uint32_t OrderIndexOffset = 0;
+ for (uint32_t PathIndex = 0; PathIndex < OutPaths.size(); PathIndex++)
{
- throw std::runtime_error(fmt::format("Number of chunk count entries does not match number of paths"));
+ if (OutRawSizes[PathIndex] > 0)
+ {
+ const IoHash& PathRawHash = OutRawHashes[PathIndex];
+ if (FoundRawHashes.insert(PathRawHash).second)
+ {
+ OutSequenceRawHashes.push_back(PathRawHash);
+ OutChunkCounts.push_back(1);
+ OutAbsoluteChunkOrders.push_back(OrderIndexOffset);
+ OutLooseChunkHashes.push_back(PathRawHash);
+ OutLooseChunkRawSizes.push_back(OutRawSizes[PathIndex]);
+ OrderIndexOffset += 1;
+ }
+ }
}
- compactbinary_helpers::ReadArray("chunkorders"sv, ChunkContentView, OutAbsoluteChunkOrders);
}
CbObjectView ChunkAttachmentsView = BuildPartManifest["chunkAttachments"sv].AsObjectView();
@@ -963,14 +998,14 @@ namespace {
{
public:
// A buffered file reader that provides CompositeBuffer where the buffers are owned and the memory never overwritten
- ReadFileCache(DiskStatistics& DiskStats,
- const std::filesystem::path& Path,
- const std::vector<std::filesystem::path>& Paths,
- const std::vector<uint64_t>& RawSizes,
- size_t MaxOpenFileCount)
+ ReadFileCache(DiskStatistics& DiskStats,
+ const std::filesystem::path& Path,
+ const ChunkedFolderContent& LocalContent,
+ const ChunkedContentLookup& LocalLookup,
+ size_t MaxOpenFileCount)
: m_Path(Path)
- , m_Paths(Paths)
- , m_RawSizes(RawSizes)
+ , m_LocalContent(LocalContent)
+ , m_LocalLookup(LocalLookup)
, m_DiskStats(DiskStats)
{
m_OpenFiles.reserve(MaxOpenFileCount);
@@ -981,26 +1016,28 @@ namespace {
m_OpenFiles.clear();
}
- CompositeBuffer GetRange(uint32_t PathIndex, uint64_t Offset, uint64_t Size)
+ CompositeBuffer GetRange(uint32_t SequenceIndex, uint64_t Offset, uint64_t Size)
{
- auto CacheIt =
- std::find_if(m_OpenFiles.begin(), m_OpenFiles.end(), [PathIndex](const auto& Lhs) { return Lhs.first == PathIndex; });
+ auto CacheIt = std::find_if(m_OpenFiles.begin(), m_OpenFiles.end(), [SequenceIndex](const auto& Lhs) {
+ return Lhs.first == SequenceIndex;
+ });
if (CacheIt != m_OpenFiles.end())
{
if (CacheIt != m_OpenFiles.begin())
{
auto CachedFile(std::move(CacheIt->second));
m_OpenFiles.erase(CacheIt);
- m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(PathIndex, std::move(CachedFile)));
+ m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(SequenceIndex, std::move(CachedFile)));
}
CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size);
m_DiskStats.ReadByteCount += Result.GetSize();
return Result;
}
- const std::filesystem::path AttachmentPath = (m_Path / m_Paths[PathIndex]).make_preferred();
- if (Size == m_RawSizes[PathIndex])
+ const uint32_t LocalPathIndex = m_LocalLookup.SequenceIndexFirstPathIndex[SequenceIndex];
+ const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred();
+ if (Size == m_LocalContent.RawSizes[LocalPathIndex])
{
- IoBuffer Result = IoBufferBuilder::MakeFromFile(AttachmentPath);
+ IoBuffer Result = IoBufferBuilder::MakeFromFile(LocalFilePath);
m_DiskStats.OpenReadCount++;
m_DiskStats.ReadByteCount += Result.GetSize();
return CompositeBuffer(SharedBuffer(Result));
@@ -1010,7 +1047,7 @@ namespace {
m_OpenFiles.pop_back();
m_DiskStats.CurrentOpenFileCount--;
}
- m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(PathIndex, std::make_unique<BufferedOpenFile>(AttachmentPath)));
+ m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(SequenceIndex, std::make_unique<BufferedOpenFile>(LocalFilePath)));
CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size);
m_DiskStats.ReadByteCount += Result.GetSize();
m_DiskStats.OpenReadCount++;
@@ -1020,23 +1057,14 @@ namespace {
private:
const std::filesystem::path m_Path;
- const std::vector<std::filesystem::path>& m_Paths;
- const std::vector<uint64_t>& m_RawSizes;
+ const ChunkedFolderContent& m_LocalContent;
+ const ChunkedContentLookup& m_LocalLookup;
std::vector<std::pair<uint32_t, std::unique_ptr<BufferedOpenFile>>> m_OpenFiles;
DiskStatistics& m_DiskStats;
};
- CompositeBuffer ValidateBlob(BuildStorage& Storage,
- const Oid& BuildId,
- const IoHash& BlobHash,
- uint64_t& OutCompressedSize,
- uint64_t& OutDecompressedSize)
+ CompositeBuffer ValidateBlob(IoBuffer&& Payload, const IoHash& BlobHash, uint64_t& OutCompressedSize, uint64_t& OutDecompressedSize)
{
- IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlobHash);
- if (!Payload)
- {
- throw std::runtime_error(fmt::format("Blob {} could not be found", BlobHash));
- }
if (Payload.GetContentType() != ZenContentType::kCompressedBinary)
{
throw std::runtime_error(fmt::format("Blob {} ({} bytes) has unexpected content type '{}'",
@@ -1080,13 +1108,26 @@ namespace {
return DecompressedComposite;
}
- ChunkBlockDescription ValidateChunkBlock(BuildStorage& Storage,
- const Oid& BuildId,
+ CompositeBuffer ValidateBlob(BuildStorage& Storage,
+ const Oid& BuildId,
+ const IoHash& BlobHash,
+ uint64_t& OutCompressedSize,
+ uint64_t& OutDecompressedSize)
+ {
+ IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlobHash);
+ if (!Payload)
+ {
+ throw std::runtime_error(fmt::format("Blob {} could not be found", BlobHash));
+ }
+ return ValidateBlob(std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize);
+ }
+
+ ChunkBlockDescription ValidateChunkBlock(IoBuffer&& Payload,
const IoHash& BlobHash,
uint64_t& OutCompressedSize,
uint64_t& OutDecompressedSize)
{
- CompositeBuffer BlockBuffer = ValidateBlob(Storage, BuildId, BlobHash, OutCompressedSize, OutDecompressedSize);
+ CompositeBuffer BlockBuffer = ValidateBlob(std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize);
return GetChunkBlockDescription(BlockBuffer.Flatten(), BlobHash);
}
@@ -1097,11 +1138,12 @@ namespace {
{
auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash);
ZEN_ASSERT(It != Lookup.ChunkHashToChunkIndex.end());
- uint32_t ChunkIndex = It->second;
- std::span<const ChunkedContentLookup::ChunkLocation> ChunkLocations = GetChunkLocations(Lookup, ChunkIndex);
+ uint32_t ChunkIndex = It->second;
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex);
ZEN_ASSERT(!ChunkLocations.empty());
- CompositeBuffer Chunk =
- OpenFileCache.GetRange(ChunkLocations[0].PathIndex, ChunkLocations[0].Offset, Content.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+ CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex,
+ ChunkLocations[0].Offset,
+ Content.ChunkedContent.ChunkRawSizes[ChunkIndex]);
ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash);
return Chunk;
};
@@ -1113,7 +1155,7 @@ namespace {
ChunkBlockDescription& OutBlockDescription,
DiskStatistics& DiskStats)
{
- ReadFileCache OpenFileCache(DiskStats, Path, Content.Paths, Content.RawSizes, 4);
+ ReadFileCache OpenFileCache(DiskStats, Path, Content, Lookup, 4);
std::vector<std::pair<IoHash, FetchChunkFunc>> BlockContent;
BlockContent.reserve(ChunksInBlock.size());
@@ -1135,6 +1177,187 @@ namespace {
return GenerateChunkBlock(std::move(BlockContent), OutBlockDescription);
};
+ void ValidateBuildPart(BuildStorage& Storage, const Oid& BuildId, Oid BuildPartId, const std::string_view BuildPartName)
+ {
+ Stopwatch Timer;
+ auto _ = MakeGuard([&]() {
+ ZEN_CONSOLE("Validated build part {}/{} ('{}') in {}",
+ BuildId,
+ BuildPartId,
+ BuildPartName,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+ CbObject Build = Storage.GetBuild(BuildId);
+ if (!BuildPartName.empty())
+ {
+ BuildPartId = Build["parts"sv].AsObjectView()[BuildPartName].AsObjectId();
+ if (BuildPartId == Oid::Zero)
+ {
+ throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", BuildId, BuildPartName));
+ }
+ }
+ CbObject BuildPart = Storage.GetBuildPart(BuildId, BuildPartId);
+ ZEN_CONSOLE("Validating build part {}/{} ({})", BuildId, BuildPartId, NiceBytes(BuildPart.GetSize()));
+ std::vector<IoHash> ChunkAttachments;
+ for (CbFieldView LooseFileView : BuildPart["chunkAttachments"sv].AsObjectView()["rawHashes"sv])
+ {
+ ChunkAttachments.push_back(LooseFileView.AsBinaryAttachment());
+ }
+ std::vector<IoHash> BlockAttachments;
+ for (CbFieldView BlocksView : BuildPart["blockAttachments"sv].AsObjectView()["rawHashes"sv])
+ {
+ BlockAttachments.push_back(BlocksView.AsBinaryAttachment());
+ }
+
+ std::vector<ChunkBlockDescription> VerifyBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockAttachments);
+ if (VerifyBlockDescriptions.size() != BlockAttachments.size())
+ {
+ throw std::runtime_error(fmt::format("Uploaded blocks metadata could not all be found, {} blocks metadata is missing",
+ BlockAttachments.size() - VerifyBlockDescriptions.size()));
+ }
+
+ WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
+ WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
+ ParallellWork Work(AbortFlag);
+
+ ProgressBar ProgressBar(UsePlainProgress);
+
+ uint64_t AttachmentsToVerifyCount = ChunkAttachments.size() + BlockAttachments.size();
+ std::atomic<uint64_t> DownloadedAttachmentCount = 0;
+ std::atomic<uint64_t> VerifiedAttachmentCount = 0;
+ std::atomic<uint64_t> DownloadedByteCount = 0;
+ std::atomic<uint64_t> VerifiedByteCount = 0;
+ FilteredRate FilteredDownloadedBytesPerSecond;
+ FilteredRate FilteredVerifiedBytesPerSecond;
+
+ for (const IoHash& ChunkAttachment : ChunkAttachments)
+ {
+ Work.ScheduleWork(
+ NetworkPool,
+ [&, ChunkAttachment](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ FilteredDownloadedBytesPerSecond.Start();
+ IoBuffer Payload = Storage.GetBuildBlob(BuildId, ChunkAttachment);
+ DownloadedAttachmentCount++;
+ DownloadedByteCount += Payload.GetSize();
+ if (DownloadedAttachmentCount.load() == AttachmentsToVerifyCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ if (!Payload)
+ {
+ throw std::runtime_error(fmt::format("Chunk attachment {} could not be found", ChunkAttachment));
+ }
+ if (!AbortFlag)
+ {
+ Work.ScheduleWork(
+ VerifyPool,
+ [&, Payload = std::move(Payload), ChunkAttachment](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ FilteredVerifiedBytesPerSecond.Start();
+
+ uint64_t CompressedSize;
+ uint64_t DecompressedSize;
+ ValidateBlob(IoBuffer(Payload), ChunkAttachment, CompressedSize, DecompressedSize);
+ ZEN_CONSOLE_VERBOSE("Chunk attachment {} ({} -> {}) is valid",
+ ChunkAttachment,
+ NiceBytes(CompressedSize),
+ NiceBytes(DecompressedSize));
+ VerifiedAttachmentCount++;
+ VerifiedByteCount += DecompressedSize;
+ if (VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
+ {
+ FilteredVerifiedBytesPerSecond.Stop();
+ }
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+
+ for (const IoHash& BlockAttachment : BlockAttachments)
+ {
+ Work.ScheduleWork(
+ NetworkPool,
+ [&, BlockAttachment](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ FilteredDownloadedBytesPerSecond.Start();
+ IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlockAttachment);
+ DownloadedAttachmentCount++;
+ DownloadedByteCount += Payload.GetSize();
+ if (DownloadedAttachmentCount.load() == AttachmentsToVerifyCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ if (!Payload)
+ {
+ throw std::runtime_error(fmt::format("Block attachment {} could not be found", BlockAttachment));
+ }
+ if (!AbortFlag)
+ {
+ Work.ScheduleWork(
+ VerifyPool,
+ [&, Payload = std::move(Payload), BlockAttachment](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ FilteredVerifiedBytesPerSecond.Start();
+
+ uint64_t CompressedSize;
+ uint64_t DecompressedSize;
+ ValidateChunkBlock(IoBuffer(Payload), BlockAttachment, CompressedSize, DecompressedSize);
+ ZEN_CONSOLE_VERBOSE("Chunk block {} ({} -> {}) is valid",
+ BlockAttachment,
+ NiceBytes(CompressedSize),
+ NiceBytes(DecompressedSize));
+ VerifiedAttachmentCount++;
+ VerifiedByteCount += DecompressedSize;
+ if (VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
+ {
+ FilteredVerifiedBytesPerSecond.Stop();
+ }
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+
+ Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(IsAborted, PendingWork);
+
+ FilteredDownloadedBytesPerSecond.Update(DownloadedByteCount);
+ FilteredVerifiedBytesPerSecond.Update(VerifiedByteCount);
+
+ std::string Details = fmt::format("Downloaded {}/{} ({}, {}bits/s). Verified {}/{} ({}, {}B/s)",
+ DownloadedAttachmentCount.load(),
+ AttachmentsToVerifyCount,
+ NiceBytes(DownloadedByteCount.load()),
+ NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8),
+ VerifiedAttachmentCount.load(),
+ AttachmentsToVerifyCount,
+ NiceBytes(VerifiedByteCount.load()),
+ NiceNum(FilteredVerifiedBytesPerSecond.GetCurrent()));
+
+ ProgressBar.UpdateState(
+ {.Task = "Validating blobs ",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2),
+ .RemainingCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2 -
+ (DownloadedAttachmentCount.load() + VerifiedAttachmentCount.load()))},
+ false);
+ });
+
+ ProgressBar.Finish();
+ }
+
void ArrangeChunksIntoBlocks(const ChunkedFolderContent& Content,
const ChunkedContentLookup& Lookup,
uint64_t MaxBlockSize,
@@ -1142,13 +1365,13 @@ namespace {
std::vector<std::vector<uint32_t>>& OutBlocks)
{
std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&Content, &Lookup](uint32_t Lhs, uint32_t Rhs) {
- const ChunkedContentLookup::ChunkLocation& LhsLocation = GetChunkLocations(Lookup, Lhs)[0];
- const ChunkedContentLookup::ChunkLocation& RhsLocation = GetChunkLocations(Lookup, Rhs)[0];
- if (LhsLocation.PathIndex < RhsLocation.PathIndex)
+ const ChunkedContentLookup::ChunkSequenceLocation& LhsLocation = GetChunkSequenceLocations(Lookup, Lhs)[0];
+ const ChunkedContentLookup::ChunkSequenceLocation& RhsLocation = GetChunkSequenceLocations(Lookup, Rhs)[0];
+ if (LhsLocation.SequenceIndex < RhsLocation.SequenceIndex)
{
return true;
}
- else if (LhsLocation.PathIndex > RhsLocation.PathIndex)
+ else if (LhsLocation.SequenceIndex > RhsLocation.SequenceIndex)
{
return false;
}
@@ -1171,7 +1394,8 @@ namespace {
// between source paths for chunks. Break the block at the last such break if any.
ZEN_ASSERT(ChunkIndexOffset > ChunkIndexStart);
- const uint32_t ChunkPathIndex = Lookup.ChunkLocations[Lookup.ChunkLocationOffset[ChunkIndex]].PathIndex;
+ const uint32_t ChunkSequenceIndex =
+ Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[ChunkIndex]].SequenceIndex;
uint64_t ScanBlockSize = BlockSize;
@@ -1185,8 +1409,9 @@ namespace {
break;
}
- const uint32_t TestPathIndex = Lookup.ChunkLocations[Lookup.ChunkLocationOffset[TestChunkIndex]].PathIndex;
- if (ChunkPathIndex != TestPathIndex)
+ const uint32_t TestSequenceIndex =
+ Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[TestChunkIndex]].SequenceIndex;
+ if (ChunkSequenceIndex != TestSequenceIndex)
{
ChunkIndexOffset = ScanChunkIndexOffset + 1;
break;
@@ -1235,10 +1460,9 @@ namespace {
const IoHash& ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex];
const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
- const ChunkedContentLookup::ChunkLocation& Source = GetChunkLocations(Lookup, ChunkIndex)[0];
-
- IoBuffer RawSource =
- IoBufferBuilder::MakeFromFile((Path / Content.Paths[Source.PathIndex]).make_preferred(), Source.Offset, ChunkSize);
+ const ChunkedContentLookup::ChunkSequenceLocation& Source = GetChunkSequenceLocations(Lookup, ChunkIndex)[0];
+ const std::uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[Source.SequenceIndex];
+ IoBuffer RawSource = IoBufferBuilder::MakeFromFile((Path / Content.Paths[PathIndex]).make_preferred(), Source.Offset, ChunkSize);
if (!RawSource)
{
throw std::runtime_error(fmt::format("Failed fetching chunk {}", ChunkHash));
@@ -1418,7 +1642,7 @@ namespace {
FilteredGeneratedBytesPerSecond.Update(GenerateBlocksStats.GeneratedBlockByteCount.load());
FilteredUploadedBytesPerSecond.Update(UploadStats.BlocksBytes.load());
- std::string Details = fmt::format("Generated {}/{} ({}, {}B/s) and uploaded {}/{} ({}, {}bits/s) blocks",
+ std::string Details = fmt::format("Generated {}/{} ({}, {}B/s). Uploaded {}/{} ({}, {}bits/s)",
GenerateBlocksStats.GeneratedBlockCount.load(),
NewBlockCount,
NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount.load()),
@@ -1470,10 +1694,11 @@ namespace {
ParallellWork Work(AbortFlag);
- std::atomic<size_t> UploadedBlockSize = 0;
- std::atomic<size_t> UploadedBlockCount = 0;
- std::atomic<size_t> UploadedChunkSize = 0;
- std::atomic<uint32_t> UploadedChunkCount = 0;
+ std::atomic<size_t> UploadedBlockSize = 0;
+ std::atomic<size_t> UploadedBlockCount = 0;
+ std::atomic<size_t> UploadedRawChunkSize = 0;
+ std::atomic<size_t> UploadedCompressedChunkSize = 0;
+ std::atomic<uint32_t> UploadedChunkCount = 0;
tsl::robin_map<uint32_t, uint32_t> ChunkIndexToLooseChunkOrderIndex;
ChunkIndexToLooseChunkOrderIndex.reserve(LooseChunkIndexes.size());
@@ -1485,8 +1710,8 @@ namespace {
std::vector<size_t> BlockIndexes;
std::vector<uint32_t> LooseChunkOrderIndexes;
- uint64_t TotalChunksSize = 0;
- uint64_t TotalBlocksSize = 0;
+ uint64_t TotalLooseChunksSize = 0;
+ uint64_t TotalBlocksSize = 0;
for (const IoHash& RawHash : RawHashes)
{
if (auto It = NewBlocks.BlockHashToBlockIndex.find(RawHash); It != NewBlocks.BlockHashToBlockIndex.end())
@@ -1501,11 +1726,11 @@ namespace {
LooseOrderIndexIt != ChunkIndexToLooseChunkOrderIndex.end())
{
LooseChunkOrderIndexes.push_back(LooseOrderIndexIt->second);
- TotalChunksSize += Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
+ TotalLooseChunksSize += Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
}
}
}
- uint64_t TotalSize = TotalChunksSize + TotalBlocksSize;
+ uint64_t TotalRawSize = TotalLooseChunksSize + TotalBlocksSize;
const size_t UploadBlockCount = BlockIndexes.size();
const uint32_t UploadChunkCount = gsl::narrow<uint32_t>(LooseChunkOrderIndexes.size());
@@ -1548,10 +1773,10 @@ namespace {
Work.DefaultErrorFunction());
};
- auto AsyncUploadLooseChunk = [&](const IoHash& RawHash, CompositeBuffer&& Payload) {
+ auto AsyncUploadLooseChunk = [&](const IoHash& RawHash, uint64_t RawSize, CompositeBuffer&& Payload) {
Work.ScheduleWork(
UploadChunkPool,
- [&, RawHash, Payload = CompositeBuffer(std::move(Payload))](std::atomic<bool>&) {
+ [&, RawHash, RawSize, Payload = CompositeBuffer(std::move(Payload))](std::atomic<bool>&) {
if (!AbortFlag)
{
const uint64_t PayloadSize = Payload.GetSize();
@@ -1571,9 +1796,9 @@ namespace {
PartPayload.SetContentType(ZenContentType::kBinary);
return PartPayload;
},
- [&](uint64_t SentBytes, bool IsComplete) {
+ [&, RawSize](uint64_t SentBytes, bool IsComplete) {
UploadStats.ChunksBytes += SentBytes;
- UploadedChunkSize += SentBytes;
+ UploadedCompressedChunkSize += SentBytes;
if (IsComplete)
{
UploadStats.ChunkCount++;
@@ -1582,6 +1807,7 @@ namespace {
{
FilteredUploadedBytesPerSecond.Stop();
}
+ UploadedRawChunkSize += RawSize;
}
});
for (auto& WorkPart : MultipartWork)
@@ -1604,7 +1830,8 @@ namespace {
ZEN_CONSOLE_VERBOSE("Uploaded chunk {} ({})", RawHash, NiceBytes(PayloadSize));
UploadStats.ChunksBytes += Payload.GetSize();
UploadStats.ChunkCount++;
- UploadedChunkSize += Payload.GetSize();
+ UploadedCompressedChunkSize += Payload.GetSize();
+ UploadedRawChunkSize += RawSize;
UploadedChunkCount++;
if (UploadedChunkCount == UploadChunkCount)
{
@@ -1696,6 +1923,7 @@ namespace {
std::atomic<uint64_t> CompressedLooseChunkCount = 0;
std::atomic<uint64_t> CompressedLooseChunkByteCount = 0;
+ std::atomic<uint64_t> RawLooseChunkByteCount = 0;
// Start compression of any non-precompressed loose chunks and schedule upload
for (const uint32_t CompressLooseChunkOrderIndex : CompressLooseChunkOrderIndexes)
@@ -1712,18 +1940,20 @@ namespace {
Content.ChunkedContent.ChunkHashes[ChunkIndex],
NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]),
NiceBytes(Payload.GetSize()));
- UploadStats.ReadFromDiskBytes += Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
+ const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
+ UploadStats.ReadFromDiskBytes += ChunkRawSize;
LooseChunksStats.CompressedChunkBytes += Payload.GetSize();
LooseChunksStats.CompressedChunkCount++;
CompressedLooseChunkByteCount += Payload.GetSize();
CompressedLooseChunkCount++;
+ RawLooseChunkByteCount += ChunkRawSize;
if (CompressedLooseChunkCount == CompressLooseChunkOrderIndexes.size())
{
FilteredCompressedBytesPerSecond.Stop();
}
if (!AbortFlag)
{
- AsyncUploadLooseChunk(Content.ChunkedContent.ChunkHashes[ChunkIndex], std::move(Payload));
+ AsyncUploadLooseChunk(Content.ChunkedContent.ChunkHashes[ChunkIndex], ChunkRawSize, std::move(Payload));
}
}
},
@@ -1734,22 +1964,31 @@ namespace {
ZEN_UNUSED(IsAborted, PendingWork);
FilteredCompressedBytesPerSecond.Update(CompressedLooseChunkByteCount.load());
FilteredGenerateBlockBytesPerSecond.Update(GeneratedBlockByteCount.load());
- FilteredUploadedBytesPerSecond.Update(UploadedChunkSize.load() + UploadedBlockSize.load());
- uint64_t UploadedSize = UploadedChunkSize.load() + UploadedBlockSize.load();
+ FilteredUploadedBytesPerSecond.Update(UploadedCompressedChunkSize.load() + UploadedBlockSize.load());
+ uint64_t UploadedRawSize = UploadedRawChunkSize.load() + UploadedBlockSize.load();
+ uint64_t UploadedCompressedSize = UploadedCompressedChunkSize.load() + UploadedBlockSize.load();
+
+ std::string Details = fmt::format(
+ "Compressed {}/{} ({}/{}) chunks. "
+ "Uploaded {}/{} ({}/{}) blobs "
+ "({} {}bits/s)",
+ CompressedLooseChunkCount.load(),
+ CompressLooseChunkOrderIndexes.size(),
+ NiceBytes(RawLooseChunkByteCount),
+ NiceBytes(TotalLooseChunksSize),
+
+ UploadedBlockCount.load() + UploadedChunkCount.load(),
+ UploadBlockCount + UploadChunkCount,
+ NiceBytes(UploadedRawSize),
+ NiceBytes(TotalRawSize),
+
+ NiceBytes(UploadedCompressedSize),
+ NiceNum(FilteredUploadedBytesPerSecond.GetCurrent()));
+
ProgressBar.UpdateState({.Task = "Uploading blobs ",
- .Details = fmt::format("Compressed {}/{} chunks. "
- "Uploaded {}/{} blobs ({}/{} {}bits/s)",
- CompressedLooseChunkCount.load(),
- CompressLooseChunkOrderIndexes.size(),
-
- UploadedBlockCount.load() + UploadedChunkCount.load(),
- UploadBlockCount + UploadChunkCount,
-
- NiceBytes(UploadedChunkSize.load() + UploadedBlockSize.load()),
- NiceBytes(TotalSize),
- NiceNum(FilteredUploadedBytesPerSecond.GetCurrent())),
- .TotalCount = gsl::narrow<uint64_t>(TotalSize),
- .RemainingCount = gsl::narrow<uint64_t>(TotalSize - UploadedSize)},
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(TotalRawSize),
+ .RemainingCount = gsl::narrow<uint64_t>(TotalRawSize - UploadedRawSize)},
false);
});
@@ -1930,7 +2169,8 @@ namespace {
bool AllowMultiparts,
const CbObject& MetaData,
bool CreateBuild,
- bool IgnoreExistingBlocks)
+ bool IgnoreExistingBlocks,
+ bool PostUploadVerify)
{
Stopwatch ProcessTimer;
@@ -2121,15 +2361,16 @@ namespace {
UsePlainProgress ? 5000 : 200,
[&](bool, std::ptrdiff_t) {
FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load());
+ std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found",
+ ChunkingStats.FilesProcessed.load(),
+ Content.Paths.size(),
+ NiceBytes(ChunkingStats.BytesHashed.load()),
+ NiceBytes(TotalRawSize),
+ NiceNum(FilteredBytesHashed.GetCurrent()),
+ ChunkingStats.UniqueChunksFound.load(),
+ NiceBytes(ChunkingStats.UniqueBytesFound.load()));
ProgressBar.UpdateState({.Task = "Scanning files ",
- .Details = fmt::format("{}/{} ({}/{}, {}B/s) files, {} ({}) chunks found",
- ChunkingStats.FilesProcessed.load(),
- Content.Paths.size(),
- NiceBytes(ChunkingStats.BytesHashed.load()),
- NiceBytes(TotalRawSize),
- NiceNum(FilteredBytesHashed.GetCurrent()),
- ChunkingStats.UniqueChunksFound.load(),
- NiceBytes(ChunkingStats.UniqueBytesFound.load())),
+ .Details = Details,
.TotalCount = TotalRawSize,
.RemainingCount = TotalRawSize - ChunkingStats.BytesHashed.load()},
false);
@@ -2422,19 +2663,18 @@ namespace {
std::vector<IoHash> OutLooseChunkHashes;
std::vector<uint64_t> OutLooseChunkRawSizes;
std::vector<IoHash> OutBlockRawHashes;
-
- ReadBuildContentFromCompactBinary(PartManifestWriter.Save(),
- VerifyFolderContent.Platform,
- VerifyFolderContent.Paths,
- VerifyFolderContent.RawHashes,
- VerifyFolderContent.RawSizes,
- VerifyFolderContent.Attributes,
- VerifyFolderContent.ChunkedContent.SequenceRawHashes,
- VerifyFolderContent.ChunkedContent.ChunkCounts,
- OutAbsoluteChunkOrders,
- OutLooseChunkHashes,
- OutLooseChunkRawSizes,
- OutBlockRawHashes);
+ 4 ReadBuildContentFromCompactBinary(PartManifestWriter.Save(),
+ VerifyFolderContent.Platform,
+ VerifyFolderContent.Paths,
+ VerifyFolderContent.RawHashes,
+ VerifyFolderContent.RawSizes,
+ VerifyFolderContent.Attributes,
+ VerifyFolderContent.ChunkedContent.SequenceRawHashes,
+ VerifyFolderContent.ChunkedContent.ChunkCounts,
+ OutAbsoluteChunkOrders,
+ OutLooseChunkHashes,
+ OutLooseChunkRawSizes,
+ OutBlockRawHashes);
ZEN_ASSERT(OutBlockRawHashes == AllChunkBlockHashes);
for (uint32_t OrderIndex = 0; OrderIndex < OutAbsoluteChunkOrders.size(); OrderIndex++)
@@ -2483,7 +2723,7 @@ namespace {
Stopwatch PutBuildPartResultTimer;
std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult = Storage.PutBuildPart(BuildId, BuildPartId, BuildPartName, PartManifest);
- ZEN_CONSOLE("PutBuildPart took {}, payload size {}. {} attachments are missing.",
+ ZEN_CONSOLE("PutBuildPart took {}, payload size {}. {} attachments are needed.",
NiceTimeSpanMs(PutBuildPartResultTimer.GetElapsedTimeMs()),
NiceBytes(PartManifest.GetSize()),
PutBuildPartResult.second.size());
@@ -2504,7 +2744,7 @@ namespace {
ZEN_CONSOLE(
"Generated {} ({} {}B/s) and uploaded {} ({}) blocks. "
"Compressed {} ({} {}B/s) and uploaded {} ({}) chunks. "
- "Transferred {} ({}B/s) in {}",
+ "Transferred {} ({}bits/s) in {}",
TempGenerateBlocksStats.GeneratedBlockCount.load(),
NiceBytes(TempGenerateBlocksStats.GeneratedBlockByteCount.load()),
NiceNum(GetBytesPerSecond(TempGenerateBlocksStats.GenerateBlocksElapsedWallTimeUS,
@@ -2571,7 +2811,7 @@ namespace {
UploadAttachments(PutBuildPartResult.second);
}
- while (true)
+ while (!AbortFlag)
{
Stopwatch FinalizeBuildPartTimer;
std::vector<IoHash> Needs = Storage.FinalizeBuildPart(BuildId, BuildPartId, PartHash);
@@ -2586,14 +2826,14 @@ namespace {
UploadAttachments(Needs);
}
- if (CreateBuild)
+ if (CreateBuild && !AbortFlag)
{
Stopwatch FinalizeBuildTimer;
Storage.FinalizeBuild(BuildId);
ZEN_CONSOLE("FinalizeBuild took {}", NiceTimeSpanMs(FinalizeBuildTimer.GetElapsedTimeMs()));
}
- if (!NewBlocks.BlockDescriptions.empty())
+ if (!NewBlocks.BlockDescriptions.empty() && !AbortFlag)
{
uint64_t UploadBlockMetadataCount = 0;
std::vector<IoHash> BlockHashes;
@@ -2619,13 +2859,11 @@ namespace {
UploadStats.ElapsedWallTimeUS += ElapsedUS;
ZEN_CONSOLE("Uploaded metadata for {} blocks in {}", UploadBlockMetadataCount, NiceTimeSpanMs(ElapsedUS / 1000));
}
+ }
- std::vector<ChunkBlockDescription> VerifyBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockHashes);
- if (VerifyBlockDescriptions.size() != BlockHashes.size())
- {
- throw std::runtime_error(fmt::format("Uploaded blocks could not all be found, {} blocks are missing",
- BlockHashes.size() - VerifyBlockDescriptions.size()));
- }
+ if (PostUploadVerify && !AbortFlag)
+ {
+ ValidateBuildPart(Storage, BuildId, BuildPartId, BuildPartName);
}
const double DeltaByteCountPercent =
@@ -2786,7 +3024,7 @@ namespace {
NiceTimeSpanMs(ProcessTimer.GetElapsedTimeMs()));
}
- void VerifyFolder(const ChunkedFolderContent& Content, const std::filesystem::path& Path)
+ void VerifyFolder(const ChunkedFolderContent& Content, const std::filesystem::path& Path, bool VerifyFileHash)
{
ProgressBar ProgressBar(UsePlainProgress);
std::atomic<uint64_t> FilesVerified(0);
@@ -2879,18 +3117,18 @@ namespace {
});
FilesFailed++;
}
- else if (SizeOnDisk > 0)
+ else if (SizeOnDisk > 0 && VerifyFileHash)
{
const IoHash& ExpectedRawHash = Content.RawHashes[PathIndex];
IoBuffer Buffer = IoBufferBuilder::MakeFromFile(TargetPath);
IoHash RawHash = IoHash::HashBuffer(Buffer);
if (RawHash != ExpectedRawHash)
{
- uint64_t FileOffset = 0;
- const uint32_t SequenceRawHashesIndex = Lookup.RawHashToSequenceRawHashIndex.at(ExpectedRawHash);
- const uint32_t OrderOffset = Lookup.SequenceRawHashIndexChunkOrderOffset[SequenceRawHashesIndex];
+ uint64_t FileOffset = 0;
+ const uint32_t SequenceIndex = Lookup.RawHashToSequenceIndex.at(ExpectedRawHash);
+ const uint32_t OrderOffset = Lookup.SequenceIndexChunkOrderOffset[SequenceIndex];
for (uint32_t OrderIndex = OrderOffset;
- OrderIndex < OrderOffset + Content.ChunkedContent.ChunkCounts[SequenceRawHashesIndex];
+ OrderIndex < OrderOffset + Content.ChunkedContent.ChunkCounts[SequenceIndex];
OrderIndex++)
{
uint32_t ChunkIndex = Content.ChunkedContent.ChunkOrders[OrderIndex];
@@ -2933,12 +3171,13 @@ namespace {
Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(IsAborted, PendingWork);
+ std::string Details = fmt::format("Verified {}/{} ({}). Failed files: {}",
+ FilesVerified.load(),
+ PathCount,
+ NiceBytes(ReadBytes.load()),
+ FilesFailed.load());
ProgressBar.UpdateState({.Task = "Verifying files ",
- .Details = fmt::format("Verified {} files out of {}. Verfied: {}. Failed files: {}",
- FilesVerified.load(),
- PathCount,
- NiceBytes(ReadBytes.load()),
- FilesFailed.load()),
+ .Details = Details,
.TotalCount = gsl::narrow<uint64_t>(PathCount),
.RemainingCount = gsl::narrow<uint64_t>(PathCount - FilesVerified.load())},
false);
@@ -3018,19 +3257,19 @@ namespace {
std::unique_ptr<BasicFileWriter> OpenFileWriter;
};
- std::vector<const ChunkedContentLookup::ChunkLocation*> GetRemainingChunkTargets(
- const std::vector<bool>& RemotePathIndexWantsCopyFromCacheFlags,
- const ChunkedContentLookup& Lookup,
- uint32_t ChunkIndex)
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> GetRemainingChunkTargets(
+ std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ const ChunkedContentLookup& Lookup,
+ uint32_t ChunkIndex)
{
- std::span<const ChunkedContentLookup::ChunkLocation> ChunkSources = GetChunkLocations(Lookup, ChunkIndex);
- std::vector<const ChunkedContentLookup::ChunkLocation*> ChunkTargetPtrs;
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkSources = GetChunkSequenceLocations(Lookup, ChunkIndex);
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs;
if (!ChunkSources.empty())
{
ChunkTargetPtrs.reserve(ChunkSources.size());
- for (const ChunkedContentLookup::ChunkLocation& Source : ChunkSources)
+ for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources)
{
- if (!RemotePathIndexWantsCopyFromCacheFlags[Source.PathIndex])
+ if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0)
{
ChunkTargetPtrs.push_back(&Source);
}
@@ -3039,20 +3278,20 @@ namespace {
return ChunkTargetPtrs;
};
- bool WriteBlockToDisk(const std::filesystem::path& CacheFolderPath,
- const ChunkedFolderContent& Content,
- const std::vector<bool>& RemotePathIndexWantsCopyFromCacheFlags,
- const CompositeBuffer& DecompressedBlockBuffer,
- const ChunkedContentLookup& Lookup,
- std::atomic<bool>* RemoteChunkIndexNeedsCopyFromSourceFlags,
- uint32_t& OutChunksComplete,
- uint64_t& OutBytesWritten)
+ bool WriteBlockToDisk(const std::filesystem::path& CacheFolderPath,
+ const ChunkedFolderContent& RemoteContent,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ const CompositeBuffer& DecompressedBlockBuffer,
+ const ChunkedContentLookup& Lookup,
+ std::atomic<bool>* RemoteChunkIndexNeedsCopyFromSourceFlags,
+ uint32_t& OutChunksComplete,
+ uint64_t& OutBytesWritten)
{
std::vector<CompositeBuffer> ChunkBuffers;
struct WriteOpData
{
- const ChunkedContentLookup::ChunkLocation* Target;
- size_t ChunkBufferIndex;
+ const ChunkedContentLookup::ChunkSequenceLocation* Target;
+ size_t ChunkBufferIndex;
};
std::vector<WriteOpData> WriteOps;
@@ -3063,9 +3302,9 @@ namespace {
[&](CompressedBuffer&& Chunk, const IoHash& ChunkHash) {
if (auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash); It != Lookup.ChunkHashToChunkIndex.end())
{
- const uint32_t ChunkIndex = It->second;
- std::vector<const ChunkedContentLookup::ChunkLocation*> ChunkTargetPtrs =
- GetRemainingChunkTargets(RemotePathIndexWantsCopyFromCacheFlags, Lookup, ChunkIndex);
+ const uint32_t ChunkIndex = It->second;
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
+ GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, Lookup, ChunkIndex);
if (!ChunkTargetPtrs.empty())
{
@@ -3078,8 +3317,8 @@ namespace {
throw std::runtime_error(fmt::format("Decompression of build blob {} failed", ChunkHash));
}
ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed));
- ZEN_ASSERT(Decompressed.GetSize() == Content.ChunkedContent.ChunkRawSizes[ChunkIndex]);
- for (const ChunkedContentLookup::ChunkLocation* Target : ChunkTargetPtrs)
+ ZEN_ASSERT(Decompressed.GetSize() == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+ for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs)
{
WriteOps.push_back(WriteOpData{.Target = Target, .ChunkBufferIndex = ChunkBuffers.size()});
}
@@ -3095,11 +3334,11 @@ namespace {
if (!AbortFlag)
{
std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOpData& Lhs, const WriteOpData& Rhs) {
- if (Lhs.Target->PathIndex < Rhs.Target->PathIndex)
+ if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
{
return true;
}
- if (Lhs.Target->PathIndex > Rhs.Target->PathIndex)
+ if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
{
return false;
}
@@ -3114,23 +3353,50 @@ namespace {
{
break;
}
- const CompositeBuffer& Chunk = ChunkBuffers[WriteOp.ChunkBufferIndex];
- const uint32_t PathIndex = WriteOp.Target->PathIndex;
- const uint64_t ChunkSize = Chunk.GetSize();
- const uint64_t FileOffset = WriteOp.Target->Offset;
- ZEN_ASSERT(FileOffset + ChunkSize <= Content.RawSizes[PathIndex]);
+ const CompositeBuffer& Chunk = ChunkBuffers[WriteOp.ChunkBufferIndex];
+ const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex;
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <=
+ RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]);
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0);
+ const uint64_t ChunkSize = Chunk.GetSize();
+ const uint64_t FileOffset = WriteOp.Target->Offset;
+ const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex];
+ ZEN_ASSERT(FileOffset + ChunkSize <= RemoteContent.RawSizes[PathIndex]);
OpenFileCache.WriteToFile<CompositeBuffer>(
- PathIndex,
- [&CacheFolderPath, &Content](uint32_t TargetIndex) {
- return (CacheFolderPath / Content.RawHashes[TargetIndex].ToHexString()).make_preferred();
+ SequenceIndex,
+ [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
+ return GetTempChunkedSequenceFileName(CacheFolderPath,
+ RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
},
Chunk,
FileOffset,
- Content.RawSizes[PathIndex]);
+ RemoteContent.RawSizes[PathIndex]);
OutBytesWritten += ChunkSize;
}
}
+ if (!AbortFlag)
+ {
+ // Write tracking, updating this must be done without any files open (WriteFileCache)
+ for (const WriteOpData& WriteOp : WriteOps)
+ {
+ const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex;
+ if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
+ {
+ const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ const IoHash VerifyChunkHash = IoHash::HashBuffer(
+ IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
+ if (VerifyChunkHash != SequenceRawHash)
+ {
+ throw std::runtime_error(fmt::format("Written hunk sequence {} hash does not match expected hash {}",
+ VerifyChunkHash,
+ SequenceRawHash));
+ }
+ std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
+ GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
+ }
+ }
+ }
OutChunksComplete += gsl::narrow<uint32_t>(ChunkBuffers.size());
}
}
@@ -3171,50 +3437,52 @@ namespace {
return Decompressed;
}
- void WriteChunkToDisk(const std::filesystem::path& CacheFolderPath,
- const ChunkedFolderContent& Content,
- std::span<const ChunkedContentLookup::ChunkLocation* const> ChunkTargets,
- const CompositeBuffer& ChunkData,
- WriteFileCache& OpenFileCache,
- uint64_t& OutBytesWritten)
+ void WriteChunkToDisk(const std::filesystem::path& CacheFolderPath,
+ const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> ChunkTargets,
+ const CompositeBuffer& ChunkData,
+ WriteFileCache& OpenFileCache,
+ uint64_t& OutBytesWritten)
{
- for (const ChunkedContentLookup::ChunkLocation* TargetPtr : ChunkTargets)
+ for (const ChunkedContentLookup::ChunkSequenceLocation* TargetPtr : ChunkTargets)
{
- const auto& Target = *TargetPtr;
- const uint64_t FileOffset = Target.Offset;
+ const auto& Target = *TargetPtr;
+ const uint64_t FileOffset = Target.Offset;
+ const uint32_t SequenceIndex = Target.SequenceIndex;
+ const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex];
OpenFileCache.WriteToFile(
- Target.PathIndex,
- [&CacheFolderPath, &Content](uint32_t TargetIndex) {
- return (CacheFolderPath / Content.RawHashes[TargetIndex].ToHexString()).make_preferred();
- // return (Path / Content.Paths[TargetIndex]).make_preferred();
+ SequenceIndex,
+ [&CacheFolderPath, &Content](uint32_t SequenceIndex) {
+ return GetTempChunkedSequenceFileName(CacheFolderPath, Content.ChunkedContent.SequenceRawHashes[SequenceIndex]);
},
ChunkData,
FileOffset,
- Content.RawSizes[Target.PathIndex]);
+ Content.RawSizes[PathIndex]);
OutBytesWritten += ChunkData.GetSize();
}
}
- void DownloadLargeBlob(BuildStorage& Storage,
- const std::filesystem::path& TempFolderPath,
- const std::filesystem::path& CacheFolderPath,
- const ChunkedFolderContent& RemoteContent,
- const ChunkedContentLookup& RemoteLookup,
- const Oid& BuildId,
- const IoHash& ChunkHash,
- const std::uint64_t PreferredMultipartChunkSize,
- const std::vector<const ChunkedContentLookup::ChunkLocation*>& ChunkTargetPtrs,
- ParallellWork& Work,
- WorkerThreadPool& WritePool,
- WorkerThreadPool& NetworkPool,
- std::atomic<uint64_t>& BytesWritten,
- std::atomic<uint64_t>& WriteToDiskBytes,
- std::atomic<uint64_t>& BytesDownloaded,
- std::atomic<uint64_t>& LooseChunksBytes,
- std::atomic<uint64_t>& DownloadedChunks,
- std::atomic<uint32_t>& ChunksComplete,
- std::atomic<uint64_t>& MultipartAttachmentCount)
+ void DownloadLargeBlob(BuildStorage& Storage,
+ const std::filesystem::path& TempFolderPath,
+ const std::filesystem::path& CacheFolderPath,
+ const ChunkedFolderContent& RemoteContent,
+ const ChunkedContentLookup& RemoteLookup,
+ const Oid& BuildId,
+ const IoHash& ChunkHash,
+ const std::uint64_t PreferredMultipartChunkSize,
+ const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ ParallellWork& Work,
+ WorkerThreadPool& WritePool,
+ WorkerThreadPool& NetworkPool,
+ std::atomic<uint64_t>& WriteToDiskBytes,
+ std::atomic<uint64_t>& BytesDownloaded,
+ std::atomic<uint64_t>& LooseChunksBytes,
+ std::atomic<uint64_t>& DownloadedChunks,
+ std::atomic<uint32_t>& ChunksComplete,
+ std::atomic<uint64_t>& MultipartAttachmentCount)
{
struct WorkloadData
{
@@ -3242,11 +3510,11 @@ namespace {
ChunkHash,
&BytesDownloaded,
&LooseChunksBytes,
- &BytesWritten,
&WriteToDiskBytes,
&DownloadedChunks,
&ChunksComplete,
- ChunkTargetPtrs = std::vector<const ChunkedContentLookup::ChunkLocation*>(
+ SequenceIndexChunksLeftToWriteCounters,
+ ChunkTargetPtrs = std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>(
ChunkTargetPtrs)](uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining) {
BytesDownloaded += Chunk.GetSize();
LooseChunksBytes += Chunk.GetSize();
@@ -3268,8 +3536,8 @@ namespace {
Offset,
BytesRemaining,
&ChunksComplete,
- &BytesWritten,
&WriteToDiskBytes,
+ SequenceIndexChunksLeftToWriteCounters,
ChunkTargetPtrs](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -3289,7 +3557,9 @@ namespace {
uint64_t TotalBytesWritten = 0;
- uint32_t ChunkIndex = RemoteLookup.ChunkHashToChunkIndex.at(ChunkHash);
+ auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
+ ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end());
+ uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second;
SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)),
ChunkHash,
@@ -3304,14 +3574,38 @@ namespace {
WriteChunkToDisk(CacheFolderPath,
RemoteContent,
+ RemoteLookup,
ChunkTargetPtrs,
CompositeBuffer(Chunk),
OpenFileCache,
TotalBytesWritten);
ChunksComplete++;
- BytesWritten += TotalBytesWritten;
WriteToDiskBytes += TotalBytesWritten;
}
+ if (!AbortFlag)
+ {
+ // Write tracking, updating this must be done without any files open (WriteFileCache)
+ for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs)
+ {
+ const uint32_t RemoteSequenceIndex = Location->SequenceIndex;
+ if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
+ {
+ const IoHash& SequenceRawHash =
+ RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
+ GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
+ if (VerifyChunkHash != SequenceRawHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Written chunk sequence {} hash does not match expected hash {}",
+ VerifyChunkHash,
+ SequenceRawHash));
+ }
+ std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
+ GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
+ }
+ }
+ }
}
},
Work.DefaultErrorFunction());
@@ -3368,45 +3662,37 @@ namespace {
const std::filesystem::path CacheFolderPath = Path / ZenTempCacheFolderName;
- tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToLocalPathIndex;
+ Stopwatch CacheMappingTimer;
- {
- Stopwatch CacheTimer;
+ uint64_t CacheMappedBytesForReuse = 0;
+ std::vector<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters(RemoteContent.ChunkedContent.SequenceRawHashes.size());
+ // std::vector<bool> RemoteSequenceIndexIsCachedFlags(RemoteContent.ChunkedContent.SequenceRawHashes.size(), false);
+ std::vector<bool> RemoteChunkIndexIsCachedFlags(RemoteContent.ChunkedContent.ChunkHashes.size());
+ // Guard if he same chunks is in multiple blocks (can happen due to block reuse, cache reuse blocks writes directly)
+ std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(RemoteContent.ChunkedContent.ChunkHashes.size());
- for (uint32_t LocalPathIndex = 0; LocalPathIndex < LocalContent.Paths.size(); LocalPathIndex++)
+ // Pick up all whole files we can use from current local state
+ for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < RemoteContent.ChunkedContent.SequenceRawHashes.size();
+ RemoteSequenceIndex++)
+ {
+ const IoHash& RemoteSequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ if (auto It = LocalLookup.RawHashToSequenceIndex.find(RemoteSequenceRawHash); It != LocalLookup.RawHashToSequenceIndex.end())
{
- ZEN_ASSERT_SLOW(std::filesystem::exists(Path / LocalContent.Paths[LocalPathIndex]));
-
- if (LocalContent.RawSizes[LocalPathIndex] > 0)
- {
- const uint32_t SequenceRawHashIndex =
- LocalLookup.RawHashToSequenceRawHashIndex.at(LocalContent.RawHashes[LocalPathIndex]);
- uint32_t ChunkCount = LocalContent.ChunkedContent.ChunkCounts[SequenceRawHashIndex];
- if (ChunkCount > 0)
- {
- const IoHash LocalRawHash = LocalContent.RawHashes[LocalPathIndex];
- if (!RawHashToLocalPathIndex.contains(LocalRawHash))
- {
- RawHashToLocalPathIndex.insert_or_assign(LocalRawHash, LocalPathIndex);
- }
- }
- }
+ SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = 0;
+ const uint32_t RemotePathIndex = GetFirstPathIndexForRawHash(RemoteLookup, RemoteSequenceRawHash);
+ CacheMappedBytesForReuse += RemoteContent.RawSizes[RemotePathIndex];
+ }
+ else
+ {
+ SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex];
}
}
- Stopwatch CacheMappingTimer;
-
- std::atomic<uint64_t> BytesWritten = 0;
- uint64_t CacheMappedBytesForReuse = 0;
-
- std::vector<bool> RemotePathIndexWantsCopyFromCacheFlags(RemoteContent.Paths.size(), false);
- std::vector<bool> RemoteChunkIndexWantsCopyFromCacheFlags(RemoteContent.ChunkedContent.ChunkHashes.size());
- // Guard if he same chunks is in multiple blocks (can happen due to block reuse, cache reuse blocks writes directly)
- std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(RemoteContent.ChunkedContent.ChunkHashes.size());
+ // Pick up all chunks in current local state
struct CacheCopyData
{
- uint32_t LocalPathIndex;
- std::vector<const ChunkedContentLookup::ChunkLocation*> TargetChunkLocationPtrs;
+ uint32_t LocalSequenceIndex;
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> TargetChunkLocationPtrs;
struct ChunkTarget
{
uint32_t TargetChunkLocationCount;
@@ -3418,43 +3704,30 @@ namespace {
tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCacheCopyDataIndex;
std::vector<CacheCopyData> CacheCopyDatas;
- uint32_t ChunkCountToWrite = 0;
- // Pick up all whole files we can use from current local state
- for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++)
+ for (uint32_t LocalSequenceIndex = 0; LocalSequenceIndex < LocalContent.ChunkedContent.SequenceRawHashes.size();
+ LocalSequenceIndex++)
{
- const IoHash& RemoteRawHash = RemoteContent.RawHashes[RemotePathIndex];
- if (auto It = RawHashToLocalPathIndex.find(RemoteRawHash); It != RawHashToLocalPathIndex.end())
- {
- RemotePathIndexWantsCopyFromCacheFlags[RemotePathIndex] = true;
- CacheMappedBytesForReuse += RemoteContent.RawSizes[RemotePathIndex];
- }
- }
-
- // Pick up all chunks in current local state
- for (auto& CachedLocalFile : RawHashToLocalPathIndex)
- {
- const IoHash& LocalFileRawHash = CachedLocalFile.first;
- const uint32_t LocalPathIndex = CachedLocalFile.second;
- const uint32_t LocalSequenceRawHashIndex = LocalLookup.RawHashToSequenceRawHashIndex.at(LocalFileRawHash);
- const uint32_t LocalOrderOffset = LocalLookup.SequenceRawHashIndexChunkOrderOffset[LocalSequenceRawHashIndex];
+ const IoHash& LocalSequenceRawHash = LocalContent.ChunkedContent.SequenceRawHashes[LocalSequenceIndex];
+ const uint32_t LocalOrderOffset = LocalLookup.SequenceIndexChunkOrderOffset[LocalSequenceIndex];
{
uint64_t SourceOffset = 0;
- const uint32_t LocalChunkCount = LocalContent.ChunkedContent.ChunkCounts[LocalSequenceRawHashIndex];
+ const uint32_t LocalChunkCount = LocalContent.ChunkedContent.ChunkCounts[LocalSequenceIndex];
for (uint32_t LocalOrderIndex = 0; LocalOrderIndex < LocalChunkCount; LocalOrderIndex++)
{
const uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[LocalOrderOffset + LocalOrderIndex];
const IoHash& LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
const uint64_t LocalChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex];
+
if (auto RemoteChunkIt = RemoteLookup.ChunkHashToChunkIndex.find(LocalChunkHash);
RemoteChunkIt != RemoteLookup.ChunkHashToChunkIndex.end())
{
const uint32_t RemoteChunkIndex = RemoteChunkIt->second;
- if (!RemoteChunkIndexWantsCopyFromCacheFlags[RemoteChunkIndex])
+ if (!RemoteChunkIndexIsCachedFlags[RemoteChunkIndex])
{
- std::vector<const ChunkedContentLookup::ChunkLocation*> ChunkTargetPtrs =
- GetRemainingChunkTargets(RemotePathIndexWantsCopyFromCacheFlags, RemoteLookup, RemoteChunkIndex);
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
+ GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex);
if (!ChunkTargetPtrs.empty())
{
@@ -3462,7 +3735,7 @@ namespace {
.TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()),
.ChunkRawSize = LocalChunkRawSize,
.CacheFileOffset = SourceOffset};
- if (auto CopySourceIt = RawHashToCacheCopyDataIndex.find(LocalFileRawHash);
+ if (auto CopySourceIt = RawHashToCacheCopyDataIndex.find(LocalSequenceRawHash);
CopySourceIt != RawHashToCacheCopyDataIndex.end())
{
CacheCopyData& Data = CacheCopyDatas[CopySourceIt->second];
@@ -3473,14 +3746,14 @@ namespace {
}
else
{
- RawHashToCacheCopyDataIndex.insert_or_assign(LocalFileRawHash, CacheCopyDatas.size());
+ RawHashToCacheCopyDataIndex.insert_or_assign(LocalSequenceRawHash, CacheCopyDatas.size());
CacheCopyDatas.push_back(
- CacheCopyData{.LocalPathIndex = LocalPathIndex,
+ CacheCopyData{.LocalSequenceIndex = LocalSequenceIndex,
.TargetChunkLocationPtrs = ChunkTargetPtrs,
.ChunkTargets = std::vector<CacheCopyData::ChunkTarget>{Target}});
}
CacheMappedBytesForReuse += LocalChunkRawSize;
- RemoteChunkIndexWantsCopyFromCacheFlags[RemoteChunkIndex] = true;
+ RemoteChunkIndexIsCachedFlags[RemoteChunkIndex] = true;
}
}
}
@@ -3489,16 +3762,24 @@ namespace {
}
}
+ if (CacheMappedBytesForReuse > 0)
+ {
+ ZEN_CONSOLE("Mapped {} cached data for reuse in {}",
+ NiceBytes(CacheMappedBytesForReuse),
+ NiceTimeSpanMs(CacheMappingTimer.GetElapsedTimeMs()));
+ }
+
+ uint32_t ChunkCountToWrite = 0;
for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++)
{
- if (RemoteChunkIndexWantsCopyFromCacheFlags[RemoteChunkIndex])
+ if (RemoteChunkIndexIsCachedFlags[RemoteChunkIndex])
{
ChunkCountToWrite++;
}
else
{
- std::vector<const ChunkedContentLookup::ChunkLocation*> ChunkTargetPtrs =
- GetRemainingChunkTargets(RemotePathIndexWantsCopyFromCacheFlags, RemoteLookup, RemoteChunkIndex);
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
+ GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex);
if (!ChunkTargetPtrs.empty())
{
RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex] = true;
@@ -3506,12 +3787,13 @@ namespace {
}
}
}
+
std::atomic<uint32_t> ChunkCountWritten = 0;
- ZEN_CONSOLE("Mapped {} cached data for reuse in {}",
- NiceBytes(CacheMappedBytesForReuse),
- NiceTimeSpanMs(CacheMappingTimer.GetElapsedTimeMs()));
{
+ FilteredRate FilteredDownloadedBytesPerSecond;
+ FilteredRate FilteredWrittenBytesPerSecond;
+
WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
@@ -3520,110 +3802,6 @@ namespace {
std::atomic<uint64_t> BytesDownloaded = 0;
- for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++)
- {
- if (AbortFlag)
- {
- break;
- }
-
- Work.ScheduleWork(
- WritePool, // GetSyncWorkerPool(),//
- [&, CopyDataIndex](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex];
- const std::filesystem::path LocalFilePath =
- (Path / LocalContent.Paths[CopyData.LocalPathIndex]).make_preferred();
- if (!CopyData.TargetChunkLocationPtrs.empty())
- {
- uint64_t CacheLocalFileBytesRead = 0;
-
- size_t TargetStart = 0;
- const std::span<const ChunkedContentLookup::ChunkLocation* const> AllTargets(
- CopyData.TargetChunkLocationPtrs);
-
- struct WriteOp
- {
- const ChunkedContentLookup::ChunkLocation* Target;
- uint64_t CacheFileOffset;
- uint64_t ChunkSize;
- };
-
- std::vector<WriteOp> WriteOps;
- WriteOps.reserve(AllTargets.size());
-
- for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets)
- {
- std::span<const ChunkedContentLookup::ChunkLocation* const> TargetRange =
- AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount);
- for (const ChunkedContentLookup::ChunkLocation* Target : TargetRange)
- {
- WriteOps.push_back(WriteOp{.Target = Target,
- .CacheFileOffset = ChunkTarget.CacheFileOffset,
- .ChunkSize = ChunkTarget.ChunkRawSize});
- }
- TargetStart += ChunkTarget.TargetChunkLocationCount;
- }
-
- std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) {
- if (Lhs.Target->PathIndex < Rhs.Target->PathIndex)
- {
- return true;
- }
- else if (Lhs.Target->PathIndex > Rhs.Target->PathIndex)
- {
- return false;
- }
- if (Lhs.Target->Offset < Rhs.Target->Offset)
- {
- return true;
- }
- return false;
- });
-
- {
- BufferedOpenFile SourceFile(LocalFilePath);
- WriteFileCache OpenFileCache;
- for (const WriteOp& Op : WriteOps)
- {
- if (AbortFlag)
- {
- break;
- }
- const uint32_t RemotePathIndex = Op.Target->PathIndex;
- const uint64_t ChunkSize = Op.ChunkSize;
- CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ChunkSize);
-
- ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]);
-
- OpenFileCache.WriteToFile<CompositeBuffer>(
- RemotePathIndex,
- [&CacheFolderPath, &RemoteContent](uint32_t TargetIndex) {
- return (CacheFolderPath / RemoteContent.RawHashes[TargetIndex].ToHexString())
- .make_preferred();
- },
- ChunkSource,
- Op.Target->Offset,
- RemoteContent.RawSizes[RemotePathIndex]);
- BytesWritten += ChunkSize;
- WriteToDiskBytes += ChunkSize;
- CacheLocalFileBytesRead += ChunkSize; // TODO: This should be the sum of unique chunk sizes?
- }
- }
- if (!AbortFlag)
- {
- ChunkCountWritten += gsl::narrow<uint32_t>(CopyData.ChunkTargets.size());
- ZEN_DEBUG("Copied {} from {}",
- NiceBytes(CacheLocalFileBytesRead),
- LocalContent.Paths[CopyData.LocalPathIndex]);
- }
- }
- }
- },
- Work.DefaultErrorFunction());
- }
-
for (const IoHash ChunkHash : LooseChunkHashes)
{
if (AbortFlag)
@@ -3631,8 +3809,10 @@ namespace {
break;
}
- uint32_t RemoteChunkIndex = RemoteLookup.ChunkHashToChunkIndex.at(ChunkHash);
- if (RemoteChunkIndexWantsCopyFromCacheFlags[RemoteChunkIndex])
+ auto RemoteChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
+ ZEN_ASSERT(RemoteChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end());
+ const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second;
+ if (RemoteChunkIndexIsCachedFlags[RemoteChunkIndex])
{
ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash);
continue;
@@ -3640,8 +3820,8 @@ namespace {
bool NeedsCopy = true;
if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false))
{
- std::vector<const ChunkedContentLookup::ChunkLocation*> ChunkTargetPtrs =
- GetRemainingChunkTargets(RemotePathIndexWantsCopyFromCacheFlags, RemoteLookup, RemoteChunkIndex);
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
+ GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex);
if (ChunkTargetPtrs.empty())
{
@@ -3654,6 +3834,7 @@ namespace {
[&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) {
if (!AbortFlag)
{
+ FilteredDownloadedBytesPerSecond.Start();
if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize)
{
DownloadLargeBlob(Storage,
@@ -3665,10 +3846,10 @@ namespace {
ChunkHash,
PreferredMultipartChunkSize,
ChunkTargetPtrs,
+ SequenceIndexChunksLeftToWriteCounters,
Work,
WritePool,
NetworkPool,
- BytesWritten,
WriteToDiskBytes,
BytesDownloaded,
LooseChunksBytes,
@@ -3698,6 +3879,7 @@ namespace {
std::atomic<bool>&) {
if (!AbortFlag)
{
+ FilteredWrittenBytesPerSecond.Start();
uint64_t TotalBytesWritten = 0;
SharedBuffer Chunk =
Decompress(CompressedPart,
@@ -3708,14 +3890,45 @@ namespace {
WriteFileCache OpenFileCache;
WriteChunkToDisk(CacheFolderPath,
RemoteContent,
+ RemoteLookup,
ChunkTargetPtrs,
CompositeBuffer(Chunk),
OpenFileCache,
TotalBytesWritten);
}
- ChunkCountWritten++;
- BytesWritten += TotalBytesWritten;
- WriteToDiskBytes += TotalBytesWritten;
+ if (!AbortFlag)
+ {
+ // Write tracking, updating this must be done without any files open
+ // (WriteFileCache)
+ for (const ChunkedContentLookup::ChunkSequenceLocation* Location :
+ ChunkTargetPtrs)
+ {
+ const uint32_t RemoteSequenceIndex = Location->SequenceIndex;
+ if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(
+ 1) == 1)
+ {
+ const IoHash& SequenceRawHash =
+ RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ const IoHash VerifyChunkHash =
+ IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
+ GetTempChunkedSequenceFileName(CacheFolderPath,
+ SequenceRawHash)));
+ if (VerifyChunkHash != SequenceRawHash)
+ {
+ throw std::runtime_error(fmt::format(
+ "Written hunk sequence {} hash does not match expected hash {}",
+ VerifyChunkHash,
+ SequenceRawHash));
+ }
+ std::filesystem::rename(
+ GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
+ GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
+ }
+ }
+
+ ChunkCountWritten++;
+ WriteToDiskBytes += TotalBytesWritten;
+ }
}
},
Work.DefaultErrorFunction());
@@ -3728,6 +3941,139 @@ namespace {
}
}
+ for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++)
+ {
+ if (AbortFlag)
+ {
+ break;
+ }
+
+ Work.ScheduleWork(
+ WritePool, // GetSyncWorkerPool(),//
+ [&, CopyDataIndex](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ FilteredWrittenBytesPerSecond.Start();
+ const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex];
+ const uint32_t LocalPathIndex = LocalLookup.SequenceIndexFirstPathIndex[CopyData.LocalSequenceIndex];
+ const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
+ if (!CopyData.TargetChunkLocationPtrs.empty())
+ {
+ uint64_t CacheLocalFileBytesRead = 0;
+
+ size_t TargetStart = 0;
+ const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets(
+ CopyData.TargetChunkLocationPtrs);
+
+ struct WriteOp
+ {
+ const ChunkedContentLookup::ChunkSequenceLocation* Target;
+ uint64_t CacheFileOffset;
+ uint64_t ChunkSize;
+ };
+
+ std::vector<WriteOp> WriteOps;
+ WriteOps.reserve(AllTargets.size());
+
+ for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets)
+ {
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange =
+ AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount);
+ for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange)
+ {
+ WriteOps.push_back(WriteOp{.Target = Target,
+ .CacheFileOffset = ChunkTarget.CacheFileOffset,
+ .ChunkSize = ChunkTarget.ChunkRawSize});
+ }
+ TargetStart += ChunkTarget.TargetChunkLocationCount;
+ }
+
+ std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) {
+ if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
+ {
+ return true;
+ }
+ else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
+ {
+ return false;
+ }
+ if (Lhs.Target->Offset < Rhs.Target->Offset)
+ {
+ return true;
+ }
+ return false;
+ });
+
+ if (!AbortFlag)
+ {
+ BufferedOpenFile SourceFile(LocalFilePath);
+ WriteFileCache OpenFileCache;
+ for (const WriteOp& Op : WriteOps)
+ {
+ if (AbortFlag)
+ {
+ break;
+ }
+ const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <=
+ RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]);
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0);
+ const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex];
+ const uint64_t ChunkSize = Op.ChunkSize;
+ CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ChunkSize);
+
+ ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]);
+
+ OpenFileCache.WriteToFile<CompositeBuffer>(
+ RemoteSequenceIndex,
+ [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
+ return GetTempChunkedSequenceFileName(
+ CacheFolderPath,
+ RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
+ },
+ ChunkSource,
+ Op.Target->Offset,
+ RemoteContent.RawSizes[RemotePathIndex]);
+ WriteToDiskBytes += ChunkSize;
+ CacheLocalFileBytesRead += ChunkSize; // TODO: This should be the sum of unique chunk sizes?
+ }
+ }
+ if (!AbortFlag)
+ {
+ if (!AbortFlag)
+ {
+ // Write tracking, updating this must be done without any files open (WriteFileCache)
+ for (const WriteOp& Op : WriteOps)
+ {
+ const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
+ if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
+ {
+ const IoHash& SequenceRawHash =
+ RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
+ GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
+ if (VerifyChunkHash != SequenceRawHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Written hunk sequence {} hash does not match expected hash {}",
+ VerifyChunkHash,
+ SequenceRawHash));
+ }
+ std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
+ GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
+ }
+ }
+ }
+
+ ChunkCountWritten += gsl::narrow<uint32_t>(CopyData.ChunkTargets.size());
+ ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]);
+ }
+ }
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+
size_t BlockCount = BlockDescriptions.size();
std::atomic<size_t> BlocksComplete = 0;
@@ -3762,6 +4108,7 @@ namespace {
[&, BlockIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
+ FilteredDownloadedBytesPerSecond.Start();
IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescriptions[BlockIndex].BlockHash);
if (!BlockBuffer)
{
@@ -3781,6 +4128,7 @@ namespace {
[&, BlockIndex, BlockBuffer = std::move(Payload)](std::atomic<bool>&) {
if (!AbortFlag)
{
+ FilteredWrittenBytesPerSecond.Start();
IoHash BlockRawHash;
uint64_t BlockRawSize;
CompressedBuffer CompressedBlockBuffer =
@@ -3812,14 +4160,13 @@ namespace {
uint32_t ChunksReadFromBlock = 0;
if (WriteBlockToDisk(CacheFolderPath,
RemoteContent,
- RemotePathIndexWantsCopyFromCacheFlags,
+ SequenceIndexChunksLeftToWriteCounters,
DecompressedBlockBuffer,
RemoteLookup,
RemoteChunkIndexNeedsCopyFromSourceFlags.data(),
ChunksReadFromBlock,
BytesWrittenToDisk))
{
- BytesWritten += BytesWrittenToDisk;
WriteToDiskBytes += BytesWrittenToDisk;
ChunkCountWritten += ChunksReadFromBlock;
}
@@ -3851,20 +4198,27 @@ namespace {
Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(IsAborted, PendingWork);
ZEN_ASSERT(ChunkCountToWrite >= ChunkCountWritten.load());
- WriteProgressBar.UpdateState(
- {.Task = "Writing chunks ",
- .Details = fmt::format("Written {} chunks out of {}. {} ouf of {} blocks complete. Downloaded: {}. Written: {}",
- ChunkCountWritten.load(),
- ChunkCountToWrite,
- BlocksComplete.load(),
- BlocksNeededCount,
- NiceBytes(BytesDownloaded.load()),
- NiceBytes(BytesWritten.load())),
- .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite),
- .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - ChunkCountWritten.load())},
- false);
+ FilteredWrittenBytesPerSecond.Update(WriteToDiskBytes.load());
+ FilteredDownloadedBytesPerSecond.Update(BytesDownloaded.load());
+ std::string Details = fmt::format("{}/{} chunks. {}/{} blocks. {} {}bits/s downloaded. {} {}B/s written",
+ ChunkCountWritten.load(),
+ ChunkCountToWrite,
+ BlocksComplete.load(),
+ BlocksNeededCount,
+ NiceBytes(BytesDownloaded.load()),
+ NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8),
+ NiceBytes(WriteToDiskBytes.load()),
+ NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()));
+ WriteProgressBar.UpdateState({.Task = "Writing chunks ",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite),
+ .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - ChunkCountWritten.load())},
+ false);
});
+ FilteredWrittenBytesPerSecond.Stop();
+ FilteredDownloadedBytesPerSecond.Stop();
+
if (AbortFlag)
{
return;
@@ -3873,6 +4227,11 @@ namespace {
WriteProgressBar.Finish();
}
+ for (const auto& SequenceIndexChunksLeftToWriteCounter : SequenceIndexChunksLeftToWriteCounters)
+ {
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() == 0);
+ }
+
std::vector<std::pair<IoHash, uint32_t>> Targets;
Targets.reserve(RemoteContent.Paths.size());
for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++)
@@ -3884,13 +4243,13 @@ namespace {
});
// Move all files we will reuse to cache folder
- for (auto It : RawHashToLocalPathIndex)
+ for (uint32_t LocalPathIndex = 0; LocalPathIndex < LocalContent.Paths.size(); LocalPathIndex++)
{
- const IoHash& RawHash = It.first;
- if (RemoteLookup.RawHashToSequenceRawHashIndex.contains(RawHash))
+ const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex];
+ if (RemoteLookup.RawHashToSequenceIndex.contains(RawHash))
{
- const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[It.second]).make_preferred();
- const std::filesystem::path CacheFilePath = (CacheFolderPath / RawHash.ToHexString()).make_preferred();
+ const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
+ const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash);
ZEN_ASSERT_SLOW(std::filesystem::exists(LocalFilePath));
SetFileReadOnly(LocalFilePath, false);
std::filesystem::rename(LocalFilePath, CacheFilePath);
@@ -3989,7 +4348,7 @@ namespace {
}
else
{
- const std::filesystem::path CacheFilePath = (CacheFolderPath / RawHash.ToHexString()).make_preferred();
+ const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash);
ZEN_ASSERT_SLOW(std::filesystem::exists(CacheFilePath));
CreateDirectories(FirstTargetFilePath.parent_path());
if (std::filesystem::exists(FirstTargetFilePath))
@@ -4045,12 +4404,12 @@ namespace {
Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(IsAborted, PendingWork);
- RebuildProgressBar.UpdateState(
- {.Task = "Rebuilding state ",
- .Details = fmt::format("Written {} files out of {}", TargetsComplete.load(), Targets.size()),
- .TotalCount = gsl::narrow<uint64_t>(Targets.size()),
- .RemainingCount = gsl::narrow<uint64_t>(Targets.size() - TargetsComplete.load())},
- false);
+ std::string Details = fmt::format("{}/{} files", TargetsComplete.load(), Targets.size());
+ RebuildProgressBar.UpdateState({.Task = "Rebuilding state ",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(Targets.size()),
+ .RemainingCount = gsl::narrow<uint64_t>(Targets.size() - TargetsComplete.load())},
+ false);
});
if (AbortFlag)
@@ -4487,20 +4846,19 @@ namespace {
UsePlainProgress ? 5000 : 200,
[&](bool, std::ptrdiff_t) {
FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load());
-
- ProgressBar.UpdateState(
- {.Task = "Scanning files ",
- .Details = fmt::format("{}/{} ({}/{}, {}B/s) files, {} ({}) chunks found",
- ChunkingStats.FilesProcessed.load(),
- UpdatedContent.Paths.size(),
- NiceBytes(ChunkingStats.BytesHashed.load()),
- NiceBytes(ByteCountToScan),
- NiceNum(FilteredBytesHashed.GetCurrent()),
- ChunkingStats.UniqueChunksFound.load(),
- NiceBytes(ChunkingStats.UniqueBytesFound.load())),
- .TotalCount = ByteCountToScan,
- .RemainingCount = ByteCountToScan - ChunkingStats.BytesHashed.load()},
- false);
+ std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found",
+ ChunkingStats.FilesProcessed.load(),
+ UpdatedContent.Paths.size(),
+ NiceBytes(ChunkingStats.BytesHashed.load()),
+ NiceBytes(ByteCountToScan),
+ NiceNum(FilteredBytesHashed.GetCurrent()),
+ ChunkingStats.UniqueChunksFound.load(),
+ NiceBytes(ChunkingStats.UniqueBytesFound.load()));
+ ProgressBar.UpdateState({.Task = "Scanning files ",
+ .Details = Details,
+ .TotalCount = ByteCountToScan,
+ .RemainingCount = ByteCountToScan - ChunkingStats.BytesHashed.load()},
+ false);
},
AbortFlag);
if (AbortFlag)
@@ -4567,15 +4925,16 @@ namespace {
UsePlainProgress ? 5000 : 200,
[&](bool, std::ptrdiff_t) {
FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load());
+ std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found",
+ ChunkingStats.FilesProcessed.load(),
+ CurrentLocalFolderContent.Paths.size(),
+ NiceBytes(ChunkingStats.BytesHashed.load()),
+ ByteCountToScan,
+ NiceNum(FilteredBytesHashed.GetCurrent()),
+ ChunkingStats.UniqueChunksFound.load(),
+ NiceBytes(ChunkingStats.UniqueBytesFound.load()));
ProgressBar.UpdateState({.Task = "Scanning files ",
- .Details = fmt::format("{}/{} ({}/{}, {}B/s) files, {} ({}) chunks found",
- ChunkingStats.FilesProcessed.load(),
- CurrentLocalFolderContent.Paths.size(),
- NiceBytes(ChunkingStats.BytesHashed.load()),
- ByteCountToScan,
- NiceNum(FilteredBytesHashed.GetCurrent()),
- ChunkingStats.UniqueChunksFound.load(),
- NiceBytes(ChunkingStats.UniqueBytesFound.load())),
+ .Details = Details,
.TotalCount = ByteCountToScan,
.RemainingCount = (ByteCountToScan - ChunkingStats.BytesHashed.load())},
false);
@@ -4599,7 +4958,8 @@ namespace {
std::span<const std::string> BuildPartNames,
const std::filesystem::path& Path,
bool AllowMultiparts,
- bool WipeTargetFolder)
+ bool WipeTargetFolder,
+ bool PostDownloadVerify)
{
Stopwatch DownloadTimer;
@@ -4610,8 +4970,10 @@ namespace {
std::filesystem::remove(ZenTempFolder);
});
CreateDirectories(Path / ZenTempBlockFolderName);
- CreateDirectories(Path / ZenTempChunkFolderName);
- CreateDirectories(Path / ZenTempCacheFolderName);
+ CreateDirectories(Path / ZenTempChunkFolderName); // TODO: Don't clear this - pick up files -> chunks to use
+ CreateDirectories(Path /
+ ZenTempCacheFolderName); // TODO: Don't clear this - pick up files and use as sequences (non .tmp extension) and
+ // delete .tmp (maybe?) - chunk them? How do we know the file is worth chunking?
std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u;
@@ -4724,7 +5086,7 @@ namespace {
if (!AbortFlag)
{
- VerifyFolder(RemoteContent, Path);
+ VerifyFolder(RemoteContent, Path, PostDownloadVerify);
Stopwatch WriteStateTimer;
CbObject StateObject = CreateStateObject(BuildId, AllBuildParts, PartContents, LocalFolderState);
@@ -5078,6 +5440,8 @@ BuildsCommand::BuildsCommand()
"Path to a text file with one line of <local path>[TAB]<modification date> per file to include.",
cxxopts::value(m_ManifestPath),
"<manifestpath>");
+ m_UploadOptions
+ .add_option("", "", "verify", "Enable post upload verify of all uploaded data", cxxopts::value(m_PostUploadVerify), "<verify>");
m_UploadOptions.parse_positional({"local-path", "build-id"});
m_UploadOptions.positional_help("local-path build-id");
@@ -5111,6 +5475,8 @@ BuildsCommand::BuildsCommand()
"Allow large attachments to be transfered using multipart protocol. Defaults to true.",
cxxopts::value(m_AllowMultiparts),
"<allowmultipart>");
+ m_DownloadOptions
+ .add_option("", "", "verify", "Enable post download verify of all tracked files", cxxopts::value(m_PostDownloadVerify), "<verify>");
m_DownloadOptions.parse_positional({"local-path", "build-id", "build-part-name"});
m_DownloadOptions.positional_help("local-path build-id build-part-name");
@@ -5503,7 +5869,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_AllowMultiparts,
MetaData,
m_CreateBuild,
- m_Clean);
+ m_Clean,
+ m_PostUploadVerify);
if (false)
{
@@ -5593,7 +5960,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
}
- DownloadFolder(*Storage, BuildId, BuildPartIds, m_BuildPartNames, m_Path, m_AllowMultiparts, m_Clean);
+ DownloadFolder(*Storage, BuildId, BuildPartIds, m_BuildPartNames, m_Path, m_AllowMultiparts, m_Clean, m_PostDownloadVerify);
if (false)
{
@@ -5727,8 +6094,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_BlockReuseMinPercentLimit,
m_AllowMultiparts,
MetaData,
- m_CreateBuild,
- m_Clean);
+ true,
+ false,
+ true);
if (AbortFlag)
{
ZEN_CONSOLE("Upload failed.");
@@ -5737,7 +6105,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
const std::filesystem::path DownloadPath = m_Path.parent_path() / (m_BuildPartName + "_download");
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, true);
+ DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, true, true);
if (AbortFlag)
{
ZEN_CONSOLE("Download failed.");
@@ -5749,7 +6117,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildPartId,
m_BuildPartName,
DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false);
+ DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false, true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed. (identical target)");
@@ -5851,7 +6219,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildPartId,
m_BuildPartName,
DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false);
+ DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false, true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed. (scrambled target)");
@@ -5880,7 +6248,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_AllowMultiparts,
MetaData2,
true,
- false);
+ false,
+ true);
if (AbortFlag)
{
ZEN_CONSOLE("Upload of scrambled failed.");
@@ -5888,7 +6257,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false);
+ DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false, true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed.");
@@ -5896,7 +6265,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, false);
+ DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, false, true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed.");
@@ -5904,7 +6273,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, false);
+ DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, false, true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed.");
@@ -6032,64 +6401,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
}
- Oid BuildPartId = Oid::TryFromHexString(m_BuildPartId);
- CbObject Build = Storage->GetBuild(BuildId);
- if (!m_BuildPartName.empty())
- {
- BuildPartId = Build["parts"sv].AsObjectView()[m_BuildPartName].AsObjectId();
- if (BuildPartId == Oid::Zero)
- {
- throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", m_BuildId, m_BuildPartName));
- }
- }
- CbObject BuildPart = Storage->GetBuildPart(BuildId, BuildPartId);
- ZEN_CONSOLE("Validating build part {}/{} ({})", BuildId, BuildPartId, NiceBytes(BuildPart.GetSize()));
- std::vector<IoHash> ChunkAttachments;
- for (CbFieldView LooseFileView : BuildPart["chunkAttachments"sv].AsObjectView()["rawHashes"sv])
- {
- ChunkAttachments.push_back(LooseFileView.AsBinaryAttachment());
- }
- std::vector<IoHash> BlockAttachments;
- for (CbFieldView BlocksView : BuildPart["blockAttachments"sv].AsObjectView()["rawHashes"sv])
- {
- BlockAttachments.push_back(BlocksView.AsBinaryAttachment());
- }
+ Oid BuildPartId = Oid::TryFromHexString(m_BuildPartId);
- for (const IoHash& ChunkAttachment : ChunkAttachments)
- {
- uint64_t CompressedSize;
- uint64_t DecompressedSize;
- try
- {
- ValidateBlob(*Storage, BuildId, ChunkAttachment, CompressedSize, DecompressedSize);
- ZEN_CONSOLE("Chunk attachment {} ({} -> {}) is valid",
- ChunkAttachment,
- NiceBytes(CompressedSize),
- NiceBytes(DecompressedSize));
- }
- catch (const std::exception& Ex)
- {
- ZEN_CONSOLE("Failed validating chunk attachment {}: {}", ChunkAttachment, Ex.what());
- }
- }
-
- for (const IoHash& BlockAttachment : BlockAttachments)
- {
- uint64_t CompressedSize;
- uint64_t DecompressedSize;
- try
- {
- ValidateChunkBlock(*Storage, BuildId, BlockAttachment, CompressedSize, DecompressedSize);
- ZEN_CONSOLE("Block attachment {} ({} -> {}) is valid",
- BlockAttachment,
- NiceBytes(CompressedSize),
- NiceBytes(DecompressedSize));
- }
- catch (const std::exception& Ex)
- {
- ZEN_CONSOLE("Failed validating block attachment {}: {}", BlockAttachment, Ex.what());
- }
- }
+ ValidateBuildPart(*Storage, BuildId, BuildPartId, m_BuildPartName);
return AbortFlag ? 13 : 0;
}
diff --git a/src/zen/cmds/builds_cmd.h b/src/zen/cmds/builds_cmd.h
index fa223943b..c54fb4db1 100644
--- a/src/zen/cmds/builds_cmd.h
+++ b/src/zen/cmds/builds_cmd.h
@@ -78,10 +78,12 @@ private:
std::filesystem::path m_Path;
cxxopts::Options m_UploadOptions{"upload", "Upload a folder"};
+ bool m_PostUploadVerify = false;
cxxopts::Options m_DownloadOptions{"download", "Download a folder"};
std::vector<std::string> m_BuildPartNames;
std::vector<std::string> m_BuildPartIds;
+ bool m_PostDownloadVerify = false;
cxxopts::Options m_DiffOptions{"diff", "Compare two local folders"};
std::filesystem::path m_DiffPath;
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index bb15a6fce..7f7e70fef 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -414,6 +414,11 @@ ValidatePayload(cpr::Response& Response, std::unique_ptr<detail::TempPayloadFile
}
}
+ if (Response.status_code == (long)HttpResponseCode::PartialContent)
+ {
+ return true;
+ }
+
if (auto ContentType = Response.header.find("Content-Type"); ContentType != Response.header.end())
{
if (ContentType->second == "application/x-ue-comp")
diff --git a/src/zenutil/chunkedcontent.cpp b/src/zenutil/chunkedcontent.cpp
index 6dc2a20d8..1552ea823 100644
--- a/src/zenutil/chunkedcontent.cpp
+++ b/src/zenutil/chunkedcontent.cpp
@@ -599,10 +599,10 @@ MergeChunkedFolderContents(const ChunkedFolderContent& Base, std::span<const Chu
{
RawHashToSequenceRawHashIndex.insert(
{RawHash, gsl::narrow<uint32_t>(Result.ChunkedContent.SequenceRawHashes.size())});
- const uint32_t SequenceRawHashIndex = OverlayLookup.RawHashToSequenceRawHashIndex.at(RawHash);
- const uint32_t OrderIndexOffset = OverlayLookup.SequenceRawHashIndexChunkOrderOffset[SequenceRawHashIndex];
- const uint32_t ChunkCount = OverlayContent.ChunkedContent.ChunkCounts[SequenceRawHashIndex];
- ChunkingStatistics Stats;
+ const uint32_t SequenceRawHashIndex = OverlayLookup.RawHashToSequenceIndex.at(RawHash);
+ const uint32_t OrderIndexOffset = OverlayLookup.SequenceIndexChunkOrderOffset[SequenceRawHashIndex];
+ const uint32_t ChunkCount = OverlayContent.ChunkedContent.ChunkCounts[SequenceRawHashIndex];
+ ChunkingStatistics Stats;
std::span<const uint32_t> OriginalChunkOrder =
std::span<const uint32_t>(OverlayContent.ChunkedContent.ChunkOrders).subspan(OrderIndexOffset, ChunkCount);
AddCunkSequence(Stats,
@@ -667,9 +667,9 @@ DeletePathsFromChunkedContent(const ChunkedFolderContent& BaseContent, std::span
{
RawHashToSequenceRawHashIndex.insert(
{RawHash, gsl::narrow<uint32_t>(Result.ChunkedContent.SequenceRawHashes.size())});
- const uint32_t SequenceRawHashIndex = BaseLookup.RawHashToSequenceRawHashIndex.at(RawHash);
- const uint32_t OrderIndexOffset = BaseLookup.SequenceRawHashIndexChunkOrderOffset[SequenceRawHashIndex];
- const uint32_t ChunkCount = BaseContent.ChunkedContent.ChunkCounts[SequenceRawHashIndex];
+ const uint32_t SequenceRawHashIndex = BaseLookup.RawHashToSequenceIndex.at(RawHash);
+ const uint32_t OrderIndexOffset = BaseLookup.SequenceIndexChunkOrderOffset[SequenceRawHashIndex];
+ const uint32_t ChunkCount = BaseContent.ChunkedContent.ChunkCounts[SequenceRawHashIndex];
ChunkingStatistics Stats;
std::span<const uint32_t> OriginalChunkOrder =
std::span<const uint32_t>(BaseContent.ChunkedContent.ChunkOrders).subspan(OrderIndexOffset, ChunkCount);
@@ -777,46 +777,40 @@ BuildChunkedContentLookup(const ChunkedFolderContent& Content)
{
struct ChunkLocationReference
{
- uint32_t ChunkIndex;
- ChunkedContentLookup::ChunkLocation Location;
+ uint32_t ChunkIndex;
+ ChunkedContentLookup::ChunkSequenceLocation Location;
};
ChunkedContentLookup Result;
{
const uint32_t SequenceRawHashesCount = gsl::narrow<uint32_t>(Content.ChunkedContent.SequenceRawHashes.size());
- Result.RawHashToSequenceRawHashIndex.reserve(SequenceRawHashesCount);
- Result.SequenceRawHashIndexChunkOrderOffset.reserve(SequenceRawHashesCount);
+ Result.RawHashToSequenceIndex.reserve(SequenceRawHashesCount);
+ Result.SequenceIndexChunkOrderOffset.reserve(SequenceRawHashesCount);
uint32_t OrderOffset = 0;
for (uint32_t SequenceRawHashIndex = 0; SequenceRawHashIndex < Content.ChunkedContent.SequenceRawHashes.size();
SequenceRawHashIndex++)
{
- Result.RawHashToSequenceRawHashIndex.insert(
- {Content.ChunkedContent.SequenceRawHashes[SequenceRawHashIndex], SequenceRawHashIndex});
- Result.SequenceRawHashIndexChunkOrderOffset.push_back(OrderOffset);
+ Result.RawHashToSequenceIndex.insert({Content.ChunkedContent.SequenceRawHashes[SequenceRawHashIndex], SequenceRawHashIndex});
+ Result.SequenceIndexChunkOrderOffset.push_back(OrderOffset);
OrderOffset += Content.ChunkedContent.ChunkCounts[SequenceRawHashIndex];
}
}
std::vector<ChunkLocationReference> Locations;
Locations.reserve(Content.ChunkedContent.ChunkOrders.size());
- for (uint32_t PathIndex = 0; PathIndex < Content.Paths.size(); PathIndex++)
+ for (uint32_t SequenceIndex = 0; SequenceIndex < Content.ChunkedContent.SequenceRawHashes.size(); SequenceIndex++)
{
- if (Content.RawSizes[PathIndex] > 0)
+ const uint32_t OrderOffset = Result.SequenceIndexChunkOrderOffset[SequenceIndex];
+ const uint32_t ChunkCount = Content.ChunkedContent.ChunkCounts[SequenceIndex];
+ uint64_t LocationOffset = 0;
+ for (size_t OrderIndex = OrderOffset; OrderIndex < OrderOffset + ChunkCount; OrderIndex++)
{
- const IoHash& RawHash = Content.RawHashes[PathIndex];
- uint32_t SequenceRawHashIndex = Result.RawHashToSequenceRawHashIndex.at(RawHash);
- const uint32_t OrderOffset = Result.SequenceRawHashIndexChunkOrderOffset[SequenceRawHashIndex];
- const uint32_t ChunkCount = Content.ChunkedContent.ChunkCounts[SequenceRawHashIndex];
- uint64_t LocationOffset = 0;
- for (size_t OrderIndex = OrderOffset; OrderIndex < OrderOffset + ChunkCount; OrderIndex++)
- {
- uint32_t ChunkIndex = Content.ChunkedContent.ChunkOrders[OrderIndex];
+ uint32_t ChunkIndex = Content.ChunkedContent.ChunkOrders[OrderIndex];
- Locations.push_back(ChunkLocationReference{ChunkIndex, ChunkedContentLookup::ChunkLocation{PathIndex, LocationOffset}});
+ Locations.push_back(
+ ChunkLocationReference{ChunkIndex, ChunkedContentLookup::ChunkSequenceLocation{SequenceIndex, LocationOffset}});
- LocationOffset += Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
- }
- ZEN_ASSERT(LocationOffset == Content.RawSizes[PathIndex]);
+ LocationOffset += Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
}
}
@@ -829,18 +823,18 @@ BuildChunkedContentLookup(const ChunkedFolderContent& Content)
{
return false;
}
- if (Lhs.Location.PathIndex < Rhs.Location.PathIndex)
+ if (Lhs.Location.SequenceIndex < Rhs.Location.SequenceIndex)
{
return true;
}
- if (Lhs.Location.PathIndex > Rhs.Location.PathIndex)
+ if (Lhs.Location.SequenceIndex > Rhs.Location.SequenceIndex)
{
return false;
}
return Lhs.Location.Offset < Rhs.Location.Offset;
});
- Result.ChunkLocations.reserve(Locations.size());
+ Result.ChunkSequenceLocations.reserve(Locations.size());
const uint32_t ChunkCount = gsl::narrow<uint32_t>(Content.ChunkedContent.ChunkHashes.size());
Result.ChunkHashToChunkIndex.reserve(ChunkCount);
size_t RangeOffset = 0;
@@ -850,14 +844,30 @@ BuildChunkedContentLookup(const ChunkedFolderContent& Content)
uint32_t Count = 0;
while (Locations[RangeOffset + Count].ChunkIndex == ChunkIndex)
{
- Result.ChunkLocations.push_back(Locations[RangeOffset + Count].Location);
+ Result.ChunkSequenceLocations.push_back(Locations[RangeOffset + Count].Location);
Count++;
}
- Result.ChunkLocationOffset.push_back(RangeOffset);
- Result.ChunkLocationCounts.push_back(Count);
+ Result.ChunkSequenceLocationOffset.push_back(RangeOffset);
+ Result.ChunkSequenceLocationCounts.push_back(Count);
RangeOffset += Count;
}
+ Result.SequenceIndexFirstPathIndex.resize(Content.ChunkedContent.SequenceRawHashes.size(), (uint32_t)-1);
+ for (uint32_t PathIndex = 0; PathIndex < Content.Paths.size(); PathIndex++)
+ {
+ if (Content.RawSizes[PathIndex] > 0)
+ {
+ const IoHash& RawHash = Content.RawHashes[PathIndex];
+ auto SequenceIndexIt = Result.RawHashToSequenceIndex.find(RawHash);
+ ZEN_ASSERT(SequenceIndexIt != Result.RawHashToSequenceIndex.end());
+ const uint32_t SequenceIndex = SequenceIndexIt->second;
+ if (Result.SequenceIndexFirstPathIndex[SequenceIndex] == (uint32_t)-1)
+ {
+ Result.SequenceIndexFirstPathIndex[SequenceIndex] = PathIndex;
+ }
+ }
+ }
+
return Result;
}
diff --git a/src/zenutil/include/zenutil/chunkedcontent.h b/src/zenutil/include/zenutil/chunkedcontent.h
index 15c687462..309341550 100644
--- a/src/zenutil/include/zenutil/chunkedcontent.h
+++ b/src/zenutil/include/zenutil/chunkedcontent.h
@@ -122,32 +122,59 @@ ChunkedFolderContent ChunkFolderContent(ChunkingStatistics& Stats,
struct ChunkedContentLookup
{
- struct ChunkLocation
+ struct ChunkSequenceLocation
{
- uint32_t PathIndex;
+ uint32_t SequenceIndex;
uint64_t Offset;
};
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex;
- tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToSequenceRawHashIndex;
- std::vector<uint32_t> SequenceRawHashIndexChunkOrderOffset;
- std::vector<ChunkLocation> ChunkLocations;
- std::vector<size_t> ChunkLocationOffset; // ChunkLocations[ChunkLocationOffset[ChunkIndex]] -> start of sources for ChunkIndex
- std::vector<uint32_t> ChunkLocationCounts; // ChunkLocationCounts[ChunkIndex] count of chunk locations for ChunkIndex
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToSequenceIndex;
+ std::vector<uint32_t> SequenceIndexChunkOrderOffset;
+ std::vector<ChunkSequenceLocation> ChunkSequenceLocations;
+ std::vector<size_t>
+ ChunkSequenceLocationOffset; // ChunkSequenceLocations[ChunkLocationOffset[ChunkIndex]] -> start of sources for ChunkIndex
+ std::vector<uint32_t> ChunkSequenceLocationCounts; // ChunkSequenceLocationCounts[ChunkIndex] count of chunk locations for ChunkIndex
+ std::vector<uint32_t> SequenceIndexFirstPathIndex; // SequenceIndexFirstPathIndex[SequenceIndex] -> first path index with that RawHash
};
ChunkedContentLookup BuildChunkedContentLookup(const ChunkedFolderContent& Content);
inline std::pair<size_t, uint32_t>
-GetChunkLocationRange(const ChunkedContentLookup& Lookup, uint32_t ChunkIndex)
+GetChunkSequenceLocationRange(const ChunkedContentLookup& Lookup, uint32_t ChunkIndex)
{
- return std::make_pair(Lookup.ChunkLocationOffset[ChunkIndex], Lookup.ChunkLocationCounts[ChunkIndex]);
+ return std::make_pair(Lookup.ChunkSequenceLocationOffset[ChunkIndex], Lookup.ChunkSequenceLocationCounts[ChunkIndex]);
}
-inline std::span<const ChunkedContentLookup::ChunkLocation>
-GetChunkLocations(const ChunkedContentLookup& Lookup, uint32_t ChunkIndex)
+inline std::span<const ChunkedContentLookup::ChunkSequenceLocation>
+GetChunkSequenceLocations(const ChunkedContentLookup& Lookup, uint32_t ChunkIndex)
{
- std::pair<size_t, uint32_t> Range = GetChunkLocationRange(Lookup, ChunkIndex);
- return std::span<const ChunkedContentLookup::ChunkLocation>(Lookup.ChunkLocations).subspan(Range.first, Range.second);
+ std::pair<size_t, uint32_t> Range = GetChunkSequenceLocationRange(Lookup, ChunkIndex);
+ return std::span<const ChunkedContentLookup::ChunkSequenceLocation>(Lookup.ChunkSequenceLocations).subspan(Range.first, Range.second);
+}
+
+inline uint32_t
+GetSequenceIndexForRawHash(const ChunkedContentLookup& Lookup, const IoHash& RawHash)
+{
+ return Lookup.RawHashToSequenceIndex.at(RawHash);
+}
+
+inline uint32_t
+GetChunkIndexForRawHash(const ChunkedContentLookup& Lookup, const IoHash& RawHash)
+{
+ return Lookup.RawHashToSequenceIndex.at(RawHash);
+}
+
+inline uint32_t
+GetFirstPathIndexForSeqeuenceIndex(const ChunkedContentLookup& Lookup, const uint32_t SequenceIndex)
+{
+ return Lookup.SequenceIndexFirstPathIndex[SequenceIndex];
+}
+
+inline uint32_t
+GetFirstPathIndexForRawHash(const ChunkedContentLookup& Lookup, const IoHash& RawHash)
+{
+ const uint32_t SequenceIndex = GetSequenceIndexForRawHash(Lookup, RawHash);
+ return GetFirstPathIndexForSeqeuenceIndex(Lookup, SequenceIndex);
}
namespace compactbinary_helpers {