aboutsummaryrefslogtreecommitdiff
path: root/src/zen/cmds/builds_cmd.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-03-18 19:33:28 +0100
committerDan Engelbrecht <[email protected]>2025-03-18 19:33:28 +0100
commit9debdcada68980e73b3d76681be3cef0cc937edd (patch)
treec9bc6d0ca3cd3c3c5d0b943e00e2312e742c90a6 /src/zen/cmds/builds_cmd.cpp
parentMerge remote-tracking branch 'origin/main' into sb/build-cache (diff)
parentimproved reporting on async error (#312) (diff)
downloadarchived-zen-sb/build-cache.tar.xz
archived-zen-sb/build-cache.zip
Merge remote-tracking branch 'origin/main' into sb/build-cachesb/build-cache
Diffstat (limited to 'src/zen/cmds/builds_cmd.cpp')
-rw-r--r--src/zen/cmds/builds_cmd.cpp1765
1 files changed, 1073 insertions, 692 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index 4b7c7fa8a..5ed3642d8 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -569,6 +569,64 @@ namespace {
}
};
+ struct CacheMappingStatistics
+ {
+ uint64_t CacheChunkCount = 0;
+ uint64_t CacheChunkByteCount = 0;
+
+ uint64_t CacheBlockCount = 0;
+ uint64_t CacheBlocksByteCount = 0;
+
+ uint64_t CacheSequenceHashesCount = 0;
+ uint64_t CacheSequenceHashesByteCount = 0;
+
+ uint32_t LocalPathsMatchingSequencesCount = 0;
+ uint64_t LocalPathsMatchingSequencesByteCount = 0;
+
+ uint64_t LocalChunkMatchingRemoteCount = 0;
+ uint64_t LocalChunkMatchingRemoteByteCount = 0;
+ };
+
+ struct DownloadStatistics
+ {
+ std::atomic<uint64_t> RequestsCompleteCount = 0;
+
+ std::atomic<uint64_t> DownloadedChunkCount = 0;
+ std::atomic<uint64_t> DownloadedChunkByteCount = 0;
+ std::atomic<uint64_t> MultipartAttachmentCount = 0;
+
+ std::atomic<uint64_t> DownloadedBlockCount = 0;
+ std::atomic<uint64_t> DownloadedBlockByteCount = 0;
+
+ std::atomic<uint64_t> DownloadedPartialBlockCount = 0;
+ std::atomic<uint64_t> DownloadedPartialBlockByteCount = 0;
+ };
+
+ struct WriteChunkStatistics
+ {
+ std::atomic<uint32_t> ChunkCountWritten = 0;
+ std::atomic<uint64_t> ChunkBytesWritten = 0;
+ uint64_t DownloadTimeUs = 0;
+ uint64_t WriteTimeUs = 0;
+ uint64_t WriteChunksElapsedWallTimeUs = 0;
+ };
+
+ struct RebuildFolderStateStatistics
+ {
+ uint64_t CleanFolderElapsedWallTimeUs = 0;
+ std::atomic<uint32_t> FinalizeTreeFilesMovedCount = 0;
+ std::atomic<uint32_t> FinalizeTreeFilesCopiedCount = 0;
+ uint64_t FinalizeTreeElapsedWallTimeUs = 0;
+ };
+
+ struct VerifyFolderStatistics
+ {
+ std::atomic<uint64_t> FilesVerified = 0;
+ std::atomic<uint64_t> FilesFailed = 0;
+ std::atomic<uint64_t> ReadBytes = 0;
+ uint64_t VerifyElapsedWallTimeUs = 0;
+ };
+
std::vector<uint32_t> CalculateAbsoluteChunkOrders(const std::span<const IoHash> LocalChunkHashes,
const std::span<const uint32_t> LocalChunkOrder,
const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& ChunkHashToLocalChunkIndex,
@@ -1035,7 +1093,15 @@ namespace {
class BufferedOpenFile
{
public:
- BufferedOpenFile(const std::filesystem::path Path) : Source(Path, BasicFile::Mode::kRead), SourceSize(Source.FileSize()) {}
+ BufferedOpenFile(const std::filesystem::path Path, DiskStatistics& DiskStats)
+ : m_Source(Path, BasicFile::Mode::kRead)
+ , m_SourceSize(m_Source.FileSize())
+ , m_DiskStats(DiskStats)
+ {
+ m_DiskStats.OpenReadCount++;
+ m_DiskStats.CurrentOpenFileCount++;
+ }
+ ~BufferedOpenFile() { m_DiskStats.CurrentOpenFileCount--; }
BufferedOpenFile() = delete;
BufferedOpenFile(const BufferedOpenFile&) = delete;
BufferedOpenFile(BufferedOpenFile&&) = delete;
@@ -1047,10 +1113,10 @@ namespace {
{
ZEN_TRACE_CPU("BufferedOpenFile::GetRange");
- ZEN_ASSERT((CacheBlockIndex == (uint64_t)-1) || Cache);
- auto _ = MakeGuard([&]() { ZEN_ASSERT((CacheBlockIndex == (uint64_t)-1) || Cache); });
+ ZEN_ASSERT((m_CacheBlockIndex == (uint64_t)-1) || m_Cache);
+ auto _ = MakeGuard([&]() { ZEN_ASSERT((m_CacheBlockIndex == (uint64_t)-1) || m_Cache); });
- ZEN_ASSERT((Offset + Size) <= SourceSize);
+ ZEN_ASSERT((Offset + Size) <= m_SourceSize);
const uint64_t BlockIndexStart = Offset / BlockSize;
const uint64_t BlockIndexEnd = (Offset + Size - 1) / BlockSize;
@@ -1061,21 +1127,23 @@ namespace {
for (uint64_t BlockIndex = BlockIndexStart; BlockIndex <= BlockIndexEnd; BlockIndex++)
{
const uint64_t BlockStartOffset = BlockIndex * BlockSize;
- if (CacheBlockIndex != BlockIndex)
+ if (m_CacheBlockIndex != BlockIndex)
{
- uint64_t CacheSize = Min(BlockSize, SourceSize - BlockStartOffset);
+ uint64_t CacheSize = Min(BlockSize, m_SourceSize - BlockStartOffset);
ZEN_ASSERT(CacheSize > 0);
- Cache = IoBuffer(CacheSize);
- Source.Read(Cache.GetMutableView().GetData(), CacheSize, BlockStartOffset);
- CacheBlockIndex = BlockIndex;
+ m_Cache = IoBuffer(CacheSize);
+ m_Source.Read(m_Cache.GetMutableView().GetData(), CacheSize, BlockStartOffset);
+ m_DiskStats.ReadCount++;
+ m_DiskStats.ReadByteCount += CacheSize;
+ m_CacheBlockIndex = BlockIndex;
}
const uint64_t BytesRead = ReadOffset - Offset;
ZEN_ASSERT(BlockStartOffset <= ReadOffset);
const uint64_t OffsetIntoBlock = ReadOffset - BlockStartOffset;
- ZEN_ASSERT(OffsetIntoBlock < Cache.GetSize());
- const uint64_t BlockBytes = Min(Cache.GetSize() - OffsetIntoBlock, Size - BytesRead);
- BufferRanges.emplace_back(SharedBuffer(IoBuffer(Cache, OffsetIntoBlock, BlockBytes)));
+ ZEN_ASSERT(OffsetIntoBlock < m_Cache.GetSize());
+ const uint64_t BlockBytes = Min(m_Cache.GetSize() - OffsetIntoBlock, Size - BytesRead);
+ BufferRanges.emplace_back(SharedBuffer(IoBuffer(m_Cache, OffsetIntoBlock, BlockBytes)));
ReadOffset += BlockBytes;
}
CompositeBuffer Result(std::move(BufferRanges));
@@ -1084,10 +1152,11 @@ namespace {
}
private:
- BasicFile Source;
- const uint64_t SourceSize;
- uint64_t CacheBlockIndex = (uint64_t)-1;
- IoBuffer Cache;
+ BasicFile m_Source;
+ const uint64_t m_SourceSize;
+ DiskStatistics& m_DiskStats;
+ uint64_t m_CacheBlockIndex = (uint64_t)-1;
+ IoBuffer m_Cache;
};
class ReadFileCache
@@ -1106,11 +1175,7 @@ namespace {
{
m_OpenFiles.reserve(MaxOpenFileCount);
}
- ~ReadFileCache()
- {
- m_DiskStats.CurrentOpenFileCount -= m_OpenFiles.size();
- m_OpenFiles.clear();
- }
+ ~ReadFileCache() { m_OpenFiles.clear(); }
CompositeBuffer GetRange(uint32_t SequenceIndex, uint64_t Offset, uint64_t Size)
{
@@ -1128,7 +1193,6 @@ namespace {
m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(SequenceIndex, std::move(CachedFile)));
}
CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size);
- m_DiskStats.ReadByteCount += Result.GetSize();
return Result;
}
const uint32_t LocalPathIndex = m_LocalLookup.SequenceIndexFirstPathIndex[SequenceIndex];
@@ -1136,20 +1200,15 @@ namespace {
if (Size == m_LocalContent.RawSizes[LocalPathIndex])
{
IoBuffer Result = IoBufferBuilder::MakeFromFile(LocalFilePath);
- m_DiskStats.OpenReadCount++;
- m_DiskStats.ReadByteCount += Result.GetSize();
return CompositeBuffer(SharedBuffer(Result));
}
if (m_OpenFiles.size() == m_OpenFiles.capacity())
{
m_OpenFiles.pop_back();
- m_DiskStats.CurrentOpenFileCount--;
}
- m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(SequenceIndex, std::make_unique<BufferedOpenFile>(LocalFilePath)));
+ m_OpenFiles.insert(m_OpenFiles.begin(),
+ std::make_pair(SequenceIndex, std::make_unique<BufferedOpenFile>(LocalFilePath, m_DiskStats)));
CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size);
- m_DiskStats.ReadByteCount += Result.GetSize();
- m_DiskStats.OpenReadCount++;
- m_DiskStats.CurrentOpenFileCount++;
return Result;
}
@@ -1186,17 +1245,21 @@ namespace {
}
IoHashStream Hash;
- bool CouldDecompress = Compressed.DecompressToStream(0, RawSize, [&Hash](uint64_t, const CompositeBuffer& RangeBuffer) {
- if (!AbortFlag)
- {
- for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ bool CouldDecompress = Compressed.DecompressToStream(
+ 0,
+ RawSize,
+ [&Hash](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset, SourceSize, Offset);
+ if (!AbortFlag)
{
- Hash.Append(Segment.GetView());
+ for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ {
+ Hash.Append(Segment.GetView());
+ }
+ return true;
}
- return true;
- }
- return false;
- });
+ return false;
+ });
if (AbortFlag)
{
@@ -1349,8 +1412,7 @@ namespace {
const std::uint64_t PreferredMultipartChunkSize,
ParallellWork& Work,
WorkerThreadPool& NetworkPool,
- std::atomic<uint64_t>& BytesDownloaded,
- std::atomic<uint64_t>& MultipartAttachmentCount,
+ DownloadStatistics& DownloadStats,
std::function<void(IoBuffer&& Payload)>&& OnDownloadComplete)
{
ZEN_TRACE_CPU("DownloadLargeBlob");
@@ -1372,10 +1434,10 @@ namespace {
BuildId,
ChunkHash,
PreferredMultipartChunkSize,
- [Workload, &BytesDownloaded, OnDownloadComplete = std::move(OnDownloadComplete)](uint64_t Offset,
- const IoBuffer& Chunk,
- uint64_t BytesRemaining) {
- BytesDownloaded += Chunk.GetSize();
+ [Workload, &DownloadStats, OnDownloadComplete = std::move(OnDownloadComplete)](uint64_t Offset,
+ const IoBuffer& Chunk,
+ uint64_t BytesRemaining) {
+ DownloadStats.DownloadedChunkByteCount += Chunk.GetSize();
if (!AbortFlag.load())
{
@@ -1383,6 +1445,7 @@ namespace {
Workload->TempFile.Write(Chunk.GetView(), Offset);
if (Chunk.GetSize() == BytesRemaining)
{
+ DownloadStats.DownloadedChunkCount++;
uint64_t PayloadSize = Workload->TempFile.FileSize();
void* FileHandle = Workload->TempFile.Detach();
ZEN_ASSERT(FileHandle != nullptr);
@@ -1394,7 +1457,7 @@ namespace {
});
if (!WorkItems.empty())
{
- MultipartAttachmentCount++;
+ DownloadStats.MultipartAttachmentCount++;
}
for (auto& WorkItem : WorkItems)
{
@@ -1411,7 +1474,23 @@ namespace {
}
}
- void ValidateBuildPart(BuildStorage& Storage, const Oid& BuildId, Oid BuildPartId, const std::string_view BuildPartName)
+ struct ValidateStatistics
+ {
+ uint64_t BuildBlobSize = 0;
+ uint64_t BuildPartSize = 0;
+ uint64_t ChunkAttachmentCount = 0;
+ uint64_t BlockAttachmentCount = 0;
+ std::atomic<uint64_t> VerifiedAttachmentCount = 0;
+ std::atomic<uint64_t> VerifiedByteCount = 0;
+ uint64_t ElapsedWallTimeUS = 0;
+ };
+
+ void ValidateBuildPart(BuildStorage& Storage,
+ const Oid& BuildId,
+ Oid BuildPartId,
+ const std::string_view BuildPartName,
+ ValidateStatistics& ValidateStats,
+ DownloadStatistics& DownloadStats)
{
Stopwatch Timer;
auto _ = MakeGuard([&]() {
@@ -1430,23 +1509,27 @@ namespace {
throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", BuildId, BuildPartName));
}
}
+ ValidateStats.BuildBlobSize = Build.GetSize();
uint64_t PreferredMultipartChunkSize = DefaultPreferredMultipartChunkSize;
if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0)
{
PreferredMultipartChunkSize = ChunkSize;
}
- CbObject BuildPart = Storage.GetBuildPart(BuildId, BuildPartId);
+ CbObject BuildPart = Storage.GetBuildPart(BuildId, BuildPartId);
+ ValidateStats.BuildPartSize = BuildPart.GetSize();
ZEN_CONSOLE("Validating build part {}/{} ({})", BuildId, BuildPartId, NiceBytes(BuildPart.GetSize()));
std::vector<IoHash> ChunkAttachments;
for (CbFieldView LooseFileView : BuildPart["chunkAttachments"sv].AsObjectView()["rawHashes"sv])
{
ChunkAttachments.push_back(LooseFileView.AsBinaryAttachment());
}
+ ValidateStats.ChunkAttachmentCount = ChunkAttachments.size();
std::vector<IoHash> BlockAttachments;
for (CbFieldView BlocksView : BuildPart["blockAttachments"sv].AsObjectView()["rawHashes"sv])
{
BlockAttachments.push_back(BlocksView.AsBinaryAttachment());
}
+ ValidateStats.BlockAttachmentCount = BlockAttachments.size();
std::vector<ChunkBlockDescription> VerifyBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockAttachments);
if (VerifyBlockDescriptions.size() != BlockAttachments.size())
@@ -1456,7 +1539,6 @@ namespace {
}
WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
- WorkerThreadPool& ReadPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
ParallellWork Work(AbortFlag);
@@ -1472,20 +1554,16 @@ namespace {
ProgressBar ProgressBar(UsePlainProgress);
- uint64_t AttachmentsToVerifyCount = ChunkAttachments.size() + BlockAttachments.size();
- std::atomic<uint64_t> DownloadedAttachmentCount = 0;
- std::atomic<uint64_t> VerifiedAttachmentCount = 0;
- std::atomic<uint64_t> DownloadedByteCount = 0;
- std::atomic<uint64_t> VerifiedByteCount = 0;
- FilteredRate FilteredDownloadedBytesPerSecond;
- FilteredRate FilteredVerifiedBytesPerSecond;
+ uint64_t AttachmentsToVerifyCount = ChunkAttachments.size() + BlockAttachments.size();
+ FilteredRate FilteredDownloadedBytesPerSecond;
+ FilteredRate FilteredVerifiedBytesPerSecond;
std::atomic<uint64_t> MultipartAttachmentCount = 0;
for (const IoHash& ChunkAttachment : ChunkAttachments)
{
Work.ScheduleWork(
- ReadPool,
+ NetworkPool,
[&, ChunkAttachment](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -1499,8 +1577,7 @@ namespace {
PreferredMultipartChunkSize,
Work,
NetworkPool,
- DownloadedByteCount,
- MultipartAttachmentCount,
+ DownloadStats,
[&, ChunkHash = ChunkAttachment](IoBuffer&& Payload) {
Payload.SetContentType(ZenContentType::kCompressedBinary);
if (!AbortFlag)
@@ -1512,6 +1589,12 @@ namespace {
{
ZEN_TRACE_CPU("ValidateBuildPart_Validate");
+ if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount ==
+ AttachmentsToVerifyCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+
FilteredVerifiedBytesPerSecond.Start();
uint64_t CompressedSize;
@@ -1521,9 +1604,9 @@ namespace {
ChunkHash,
NiceBytes(CompressedSize),
NiceBytes(DecompressedSize));
- VerifiedAttachmentCount++;
- VerifiedByteCount += DecompressedSize;
- if (VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
+ ValidateStats.VerifiedAttachmentCount++;
+ ValidateStats.VerifiedByteCount += DecompressedSize;
+ if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
{
FilteredVerifiedBytesPerSecond.Stop();
}
@@ -1548,9 +1631,9 @@ namespace {
FilteredDownloadedBytesPerSecond.Start();
IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlockAttachment);
- DownloadedAttachmentCount++;
- DownloadedByteCount += Payload.GetSize();
- if (DownloadedAttachmentCount.load() == AttachmentsToVerifyCount)
+ DownloadStats.DownloadedBlockCount++;
+ DownloadStats.DownloadedBlockByteCount += Payload.GetSize();
+ if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
@@ -1576,9 +1659,9 @@ namespace {
BlockAttachment,
NiceBytes(CompressedSize),
NiceBytes(DecompressedSize));
- VerifiedAttachmentCount++;
- VerifiedByteCount += DecompressedSize;
- if (VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
+ ValidateStats.VerifiedAttachmentCount++;
+ ValidateStats.VerifiedByteCount += DecompressedSize;
+ if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
{
FilteredVerifiedBytesPerSecond.Stop();
}
@@ -1594,17 +1677,20 @@ namespace {
Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(IsAborted, PendingWork);
+ const uint64_t DownloadedAttachmentCount = DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount;
+ const uint64_t DownloadedByteCount = DownloadStats.DownloadedChunkByteCount + DownloadStats.DownloadedBlockByteCount;
+
FilteredDownloadedBytesPerSecond.Update(DownloadedByteCount);
- FilteredVerifiedBytesPerSecond.Update(VerifiedByteCount);
+ FilteredVerifiedBytesPerSecond.Update(ValidateStats.VerifiedByteCount);
std::string Details = fmt::format("Downloaded {}/{} ({}, {}bits/s). Verified {}/{} ({}, {}B/s)",
- DownloadedAttachmentCount.load(),
+ DownloadedAttachmentCount,
AttachmentsToVerifyCount,
- NiceBytes(DownloadedByteCount.load()),
+ NiceBytes(DownloadedByteCount),
NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8),
- VerifiedAttachmentCount.load(),
+ ValidateStats.VerifiedAttachmentCount.load(),
AttachmentsToVerifyCount,
- NiceBytes(VerifiedByteCount.load()),
+ NiceBytes(ValidateStats.VerifiedByteCount.load()),
NiceNum(FilteredVerifiedBytesPerSecond.GetCurrent()));
ProgressBar.UpdateState(
@@ -1612,11 +1698,12 @@ namespace {
.Details = Details,
.TotalCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2),
.RemainingCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2 -
- (DownloadedAttachmentCount.load() + VerifiedAttachmentCount.load()))},
+ (DownloadedAttachmentCount + ValidateStats.VerifiedAttachmentCount.load()))},
false);
});
ProgressBar.Finish();
+ ValidateStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs();
}
void ArrangeChunksIntoBlocks(const ChunkedFolderContent& Content,
@@ -1717,7 +1804,9 @@ namespace {
const ChunkedFolderContent& Content,
const ChunkedContentLookup& Lookup,
uint32_t ChunkIndex,
- const std::filesystem::path& TempFolderPath)
+ const std::filesystem::path& TempFolderPath,
+ std::atomic<uint64_t>& ReadRawBytes,
+ LooseChunksStatistics& LooseChunksStats)
{
ZEN_TRACE_CPU("CompressChunk");
ZEN_ASSERT(!TempFolderPath.empty());
@@ -1750,7 +1839,12 @@ namespace {
bool CouldCompress = CompressedBuffer::CompressToStream(
CompositeBuffer(SharedBuffer(RawSource)),
- [&](uint64_t Offset, const CompositeBuffer& RangeBuffer) { CompressedFile.Write(RangeBuffer, Offset); });
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset);
+ ReadRawBytes += SourceSize;
+ CompressedFile.Write(RangeBuffer, Offset);
+ LooseChunksStats.CompressedChunkBytes += RangeBuffer.GetSize();
+ });
if (CouldCompress)
{
uint64_t CompressedSize = CompressedFile.FileSize();
@@ -1768,6 +1862,9 @@ namespace {
ZEN_ASSERT(Compressed);
ZEN_ASSERT(RawHash == ChunkHash);
ZEN_ASSERT(RawSize == ChunkSize);
+
+ LooseChunksStats.CompressedChunkCount++;
+
return Compressed.GetCompressed();
}
CompressedFile.Close();
@@ -2007,7 +2104,6 @@ namespace {
const std::uint64_t LargeAttachmentSize,
DiskStatistics& DiskStats,
UploadStatistics& UploadStats,
- GenerateBlocksStatistics& GenerateBlocksStats,
LooseChunksStatistics& LooseChunksStats)
{
ZEN_TRACE_CPU("UploadPartBlobs");
@@ -2257,8 +2353,6 @@ namespace {
Payload = std::move(CompressedBlock).GetCompressed();
}
- GenerateBlocksStats.GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex];
- GenerateBlocksStats.GeneratedBlockCount++;
GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex];
GeneratedBlockCount++;
if (GeneratedBlockCount == GenerateBlockIndexes.size())
@@ -2279,9 +2373,7 @@ namespace {
}
}
- std::atomic<uint64_t> CompressedLooseChunkCount = 0;
- std::atomic<uint64_t> CompressedLooseChunkByteCount = 0;
- std::atomic<uint64_t> RawLooseChunkByteCount = 0;
+ std::atomic<uint64_t> RawLooseChunkByteCount = 0;
// Start compression of any non-precompressed loose chunks and schedule upload
for (const uint32_t CompressLooseChunkOrderIndex : CompressLooseChunkOrderIndexes)
@@ -2295,19 +2387,20 @@ namespace {
ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk");
FilteredCompressedBytesPerSecond.Start();
- CompositeBuffer Payload = CompressChunk(Path, Content, Lookup, ChunkIndex, Path / ZenTempChunkFolderName);
+ CompositeBuffer Payload = CompressChunk(Path,
+ Content,
+ Lookup,
+ ChunkIndex,
+ Path / ZenTempChunkFolderName,
+ RawLooseChunkByteCount,
+ LooseChunksStats);
ZEN_CONSOLE_VERBOSE("Compressed chunk {} ({} -> {})",
Content.ChunkedContent.ChunkHashes[ChunkIndex],
NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]),
NiceBytes(Payload.GetSize()));
const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
UploadStats.ReadFromDiskBytes += ChunkRawSize;
- LooseChunksStats.CompressedChunkBytes += Payload.GetSize();
- LooseChunksStats.CompressedChunkCount++;
- CompressedLooseChunkByteCount += Payload.GetSize();
- CompressedLooseChunkCount++;
- RawLooseChunkByteCount += ChunkRawSize;
- if (CompressedLooseChunkCount == CompressLooseChunkOrderIndexes.size())
+ if (LooseChunksStats.CompressedChunkCount == CompressLooseChunkOrderIndexes.size())
{
FilteredCompressedBytesPerSecond.Stop();
}
@@ -2322,20 +2415,21 @@ namespace {
Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(IsAborted, PendingWork);
- FilteredCompressedBytesPerSecond.Update(CompressedLooseChunkByteCount.load());
+ FilteredCompressedBytesPerSecond.Update(LooseChunksStats.CompressedChunkBytes.load());
FilteredGenerateBlockBytesPerSecond.Update(GeneratedBlockByteCount.load());
FilteredUploadedBytesPerSecond.Update(UploadedCompressedChunkSize.load() + UploadedBlockSize.load());
uint64_t UploadedRawSize = UploadedRawChunkSize.load() + UploadedBlockSize.load();
uint64_t UploadedCompressedSize = UploadedCompressedChunkSize.load() + UploadedBlockSize.load();
std::string Details = fmt::format(
- "Compressed {}/{} ({}/{}) chunks. "
+ "Compressed {}/{} ({}/{} {}B/s) chunks. "
"Uploaded {}/{} ({}/{}) blobs "
"({} {}bits/s)",
- CompressedLooseChunkCount.load(),
+ LooseChunksStats.CompressedChunkCount.load(),
CompressLooseChunkOrderIndexes.size(),
NiceBytes(RawLooseChunkByteCount),
NiceBytes(TotalLooseChunksSize),
+ NiceNum(FilteredCompressedBytesPerSecond.GetCurrent()),
UploadedBlockCount.load() + UploadedChunkCount.load(),
UploadBlockCount + UploadChunkCount,
@@ -2355,9 +2449,8 @@ namespace {
ZEN_ASSERT(AbortFlag || QueuedPendingInMemoryBlocksForUpload.load() == 0);
ProgressBar.Finish();
- UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS();
- GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGenerateBlockBytesPerSecond.GetElapsedTimeUS();
- LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTimeUS();
+ UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS();
+ LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTimeUS();
}
}
@@ -2550,6 +2643,8 @@ namespace {
CreateDirectories(Path / ZenTempBlockFolderName);
CreateDirectories(Path / ZenTempChunkFolderName);
+ std::uint64_t TotalRawSize = 0;
+
CbObject ChunkerParameters;
struct PrepareBuildResult
@@ -2770,7 +2865,7 @@ namespace {
ChunkerParameters = ChunkParametersWriter.Save();
}
- std::uint64_t TotalRawSize = std::accumulate(Content.RawSizes.begin(), Content.RawSizes.end(), std::uint64_t(0));
+ TotalRawSize = std::accumulate(Content.RawSizes.begin(), Content.RawSizes.end(), std::uint64_t(0));
{
ProgressBar ProgressBar(UsePlainProgress);
@@ -3139,21 +3234,16 @@ namespace {
{
ZEN_CONSOLE_VERBOSE("Uploading attachments: {}", FormatArray<IoHash>(RawHashes, "\n "sv));
- UploadStatistics TempUploadStats;
- GenerateBlocksStatistics TempGenerateBlocksStats;
- LooseChunksStatistics TempLooseChunksStats;
+ UploadStatistics TempUploadStats;
+ LooseChunksStatistics TempLooseChunksStats;
Stopwatch TempUploadTimer;
auto __ = MakeGuard([&]() {
uint64_t TempChunkUploadTimeUs = TempUploadTimer.GetElapsedTimeUs();
ZEN_CONSOLE(
- "Generated {} ({} {}B/s) and uploaded {} ({}) blocks. "
+ "Uploaded {} ({}) blocks. "
"Compressed {} ({} {}B/s) and uploaded {} ({}) chunks. "
"Transferred {} ({}bits/s) in {}",
- TempGenerateBlocksStats.GeneratedBlockCount.load(),
- NiceBytes(TempGenerateBlocksStats.GeneratedBlockByteCount.load()),
- NiceNum(GetBytesPerSecond(TempGenerateBlocksStats.GenerateBlocksElapsedWallTimeUS,
- TempGenerateBlocksStats.GeneratedBlockByteCount)),
TempUploadStats.BlockCount.load(),
NiceBytes(TempUploadStats.BlocksBytes),
@@ -3180,11 +3270,9 @@ namespace {
LargeAttachmentSize,
DiskStats,
TempUploadStats,
- TempGenerateBlocksStats,
TempLooseChunksStats);
UploadStats += TempUploadStats;
LooseChunksStats += TempLooseChunksStats;
- GenerateBlocksStats += TempGenerateBlocksStats;
}
};
if (IgnoreExistingBlocks)
@@ -3266,18 +3354,25 @@ namespace {
}
}
+ ValidateStatistics ValidateStats;
+ DownloadStatistics ValidateDownloadStats;
if (PostUploadVerify && !AbortFlag)
{
- ValidateBuildPart(Storage, BuildId, BuildPartId, BuildPartName);
+ ValidateBuildPart(Storage, BuildId, BuildPartId, BuildPartName, ValidateStats, ValidateDownloadStats);
}
- const double DeltaByteCountPercent =
- ChunkingStats.BytesHashed > 0
- ? (100.0 * (FindBlocksStats.NewBlocksChunkByteCount + LooseChunksStats.CompressedChunkBytes)) / (ChunkingStats.BytesHashed)
- : 0.0;
-
- const std::string LargeAttachmentStats =
- (LargeAttachmentSize != (uint64_t)-1) ? fmt::format(" ({} as multipart)", UploadStats.MultipartAttachmentCount.load()) : "";
+ struct ValidateStatistics
+ {
+ uint64_t BuildBlobSize = 0;
+ uint64_t BuildPartSize = 0;
+ uint64_t ChunkAttachmentCount = 0;
+ uint64_t BlockAttachmentCount = 0;
+ std::atomic<uint64_t> DownloadedAttachmentCount = 0;
+ std::atomic<uint64_t> VerifiedAttachmentCount = 0;
+ std::atomic<uint64_t> DownloadedByteCount = 0;
+ std::atomic<uint64_t> VerifiedByteCount = 0;
+ uint64_t ElapsedWallTimeUS = 0;
+ };
ZEN_CONSOLE_VERBOSE(
"Folder scanning stats:"
@@ -3401,42 +3496,122 @@ namespace {
UploadStats.MultipartAttachmentCount.load(),
NiceLatencyNs(UploadStats.ElapsedWallTimeUS * 1000));
+ if (PostUploadVerify)
+ {
+ ZEN_CONSOLE_VERBOSE(
+ "Validate stats:"
+ "\n BuildBlobSize: {}"
+ "\n BuildPartSize: {}"
+ "\n ChunkAttachmentCount: {}"
+ "\n BlockAttachmentCount: {}"
+ "\n VerifiedAttachmentCount: {}"
+ "\n VerifiedByteCount: {}"
+ "\n ElapsedWallTimeUS: {}",
+ NiceBytes(ValidateStats.BuildBlobSize),
+ NiceBytes(ValidateStats.BuildPartSize),
+ ValidateStats.ChunkAttachmentCount,
+ ValidateStats.BlockAttachmentCount,
+ ValidateStats.VerifiedAttachmentCount.load(),
+ NiceBytes(ValidateStats.VerifiedByteCount.load()),
+ NiceLatencyNs(ValidateStats.ElapsedWallTimeUS * 1000));
+
+ ZEN_CONSOLE_VERBOSE(
+ "Validate download stats:"
+ "\n RequestsCompleteCount: {}"
+ "\n DownloadedChunkCount: {}"
+ "\n DownloadedChunkByteCount: {}"
+ "\n MultipartAttachmentCount: {}"
+ "\n DownloadedBlockCount: {}"
+ "\n DownloadedBlockByteCount: {}"
+ "\n DownloadedPartialBlockCount: {}"
+ "\n DownloadedPartialBlockByteCount: {}",
+ ValidateDownloadStats.RequestsCompleteCount.load(),
+ ValidateDownloadStats.DownloadedChunkCount.load(),
+ NiceBytes(ValidateDownloadStats.DownloadedChunkByteCount.load()),
+ ValidateDownloadStats.MultipartAttachmentCount.load(),
+ ValidateDownloadStats.DownloadedBlockCount.load(),
+ NiceBytes(ValidateDownloadStats.DownloadedBlockByteCount.load()),
+ ValidateDownloadStats.DownloadedPartialBlockCount.load(),
+ NiceBytes(ValidateDownloadStats.DownloadedPartialBlockByteCount.load()));
+ }
+
+ const double DeltaByteCountPercent =
+ ChunkingStats.BytesHashed > 0
+ ? (100.0 * (FindBlocksStats.NewBlocksChunkByteCount + LooseChunksStats.CompressedChunkBytes)) / (ChunkingStats.BytesHashed)
+ : 0.0;
+
+ const std::string MultipartAttachmentStats =
+ (LargeAttachmentSize != (uint64_t)-1) ? fmt::format(" ({} as multipart)", UploadStats.MultipartAttachmentCount.load()) : "";
+
+ std::string ValidateInfo;
+ if (PostUploadVerify)
+ {
+ const uint64_t DownloadedCount = ValidateDownloadStats.DownloadedChunkCount + ValidateDownloadStats.DownloadedBlockCount;
+ const uint64_t DownloadedByteCount =
+ ValidateDownloadStats.DownloadedChunkByteCount + ValidateDownloadStats.DownloadedBlockByteCount;
+ ValidateInfo = fmt::format("\n Verified: {} ({}) {}B/sec in {}",
+ DownloadedCount,
+ NiceBytes(DownloadedByteCount),
+ NiceNum(GetBytesPerSecond(ValidateStats.ElapsedWallTimeUS, DownloadedByteCount)),
+ NiceTimeSpanMs(ValidateStats.ElapsedWallTimeUS / 1000));
+ }
+
ZEN_CONSOLE(
- "Uploaded {}\n"
- " Delta: {}/{} ({:.1f}%)\n"
- " Blocks: {} ({})\n"
- " Chunks: {} ({}){}\n"
- " Rate: {}bits/sec",
- NiceBytes(UploadStats.BlocksBytes + UploadStats.ChunksBytes),
+ "Uploaded part {} ('{}') to build {} in {} \n"
+ " Scanned files: {} ({}) {}B/sec in {}\n"
+ " New data: {} ({:.1f}%)\n"
+ " New blocks: {} ({}) {}B/sec\n"
+ " New chunks: {} ({} -> {}) {}B/sec\n"
+ " Uploaded: {} ({}) {}bits/sec\n"
+ " Blocks: {} ({})\n"
+ " Chunks: {} ({}){}"
+ "{}",
+ BuildPartId,
+ BuildPartName,
+ BuildId,
+ NiceTimeSpanMs(ProcessTimer.GetElapsedTimeMs()),
+
+ LocalFolderScanStats.FoundFileCount.load(),
+ NiceBytes(LocalFolderScanStats.FoundFileByteCount.load()),
+ NiceNum(GetBytesPerSecond(ChunkingStats.ElapsedWallTimeUS, ChunkingStats.BytesHashed)),
+ NiceTimeSpanMs(ChunkingStats.ElapsedWallTimeUS / 1000),
NiceBytes(FindBlocksStats.NewBlocksChunkByteCount + LooseChunksStats.CompressedChunkBytes),
- NiceBytes(ChunkingStats.BytesHashed),
DeltaByteCountPercent,
+ GenerateBlocksStats.GeneratedBlockCount.load(),
+ NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount.load()),
+ NiceNum(GetBytesPerSecond(GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS, GenerateBlocksStats.GeneratedBlockByteCount)),
+
+ LooseChunksStats.CompressedChunkCount.load(),
+ NiceBytes(LooseChunksStats.ChunkByteCount),
+ NiceBytes(LooseChunksStats.CompressedChunkBytes.load()),
+ NiceNum(GetBytesPerSecond(LooseChunksStats.CompressChunksElapsedWallTimeUS, LooseChunksStats.ChunkByteCount)),
+
+ NiceBytes(UploadStats.BlockCount.load() + UploadStats.ChunkCount.load()),
+ NiceBytes(UploadStats.BlocksBytes + UploadStats.ChunksBytes),
+ NiceNum(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, (UploadStats.ChunksBytes + UploadStats.BlocksBytes * 8))),
+
UploadStats.BlockCount.load(),
- NiceBytes(UploadStats.BlocksBytes),
- UploadStats.ChunkCount.load(),
- NiceBytes(UploadStats.ChunksBytes),
- LargeAttachmentStats,
+ NiceBytes(UploadStats.BlocksBytes.load()),
- NiceNum(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, (UploadStats.ChunksBytes + UploadStats.BlocksBytes * 8))));
+ UploadStats.ChunkCount.load(),
+ NiceBytes(UploadStats.ChunksBytes.load()),
+ MultipartAttachmentStats,
- ZEN_CONSOLE("Uploaded ({}) build {} part {} ({}) in {}",
- NiceBytes(FindBlocksStats.NewBlocksChunkByteCount + LooseChunksStats.CompressedChunkBytes),
- BuildId,
- BuildPartName,
- BuildPartId,
- NiceTimeSpanMs(ProcessTimer.GetElapsedTimeMs()));
+ ValidateInfo);
}
- void VerifyFolder(const ChunkedFolderContent& Content, const std::filesystem::path& Path, bool VerifyFileHash)
+ void VerifyFolder(const ChunkedFolderContent& Content,
+ const std::filesystem::path& Path,
+ bool VerifyFileHash,
+ VerifyFolderStatistics& VerifyFolderStats)
{
ZEN_TRACE_CPU("VerifyFolder");
- ProgressBar ProgressBar(UsePlainProgress);
- std::atomic<uint64_t> FilesVerified(0);
- std::atomic<uint64_t> FilesFailed(0);
- std::atomic<uint64_t> ReadBytes(0);
+ Stopwatch Timer;
+
+ ProgressBar ProgressBar(UsePlainProgress);
WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
@@ -3492,7 +3667,7 @@ namespace {
ErrorLock.WithExclusiveLock([&]() {
Errors.push_back(fmt::format("File {} with expected size {} does not exist", TargetPath, ExpectedSize));
});
- FilesFailed++;
+ VerifyFolderStats.FilesFailed++;
}
else
{
@@ -3504,7 +3679,7 @@ namespace {
Errors.push_back(
fmt::format("Failed to get size of file {}: {} ({})", TargetPath, Ec.message(), Ec.value()));
});
- FilesFailed++;
+ VerifyFolderStats.FilesFailed++;
}
else if (SizeOnDisk < ExpectedSize)
{
@@ -3514,7 +3689,7 @@ namespace {
ExpectedSize,
SizeOnDisk));
});
- FilesFailed++;
+ VerifyFolderStats.FilesFailed++;
}
else if (SizeOnDisk > ExpectedSize)
{
@@ -3524,7 +3699,7 @@ namespace {
ExpectedSize,
SizeOnDisk));
});
- FilesFailed++;
+ VerifyFolderStats.FilesFailed++;
}
else if (SizeOnDisk > 0 && VerifyFileHash)
{
@@ -3559,13 +3734,13 @@ namespace {
}
FileOffset += ChunkSize;
}
- FilesFailed++;
+ VerifyFolderStats.FilesFailed++;
}
- ReadBytes += SizeOnDisk;
+ VerifyFolderStats.ReadBytes += SizeOnDisk;
}
}
}
- FilesVerified++;
+ VerifyFolderStats.FilesVerified++;
}
},
[&, PathIndex](const std::exception& Ex, std::atomic<bool>&) {
@@ -3574,23 +3749,25 @@ namespace {
(Path / Content.Paths[PathIndex]).make_preferred(),
Ex.what()));
});
- FilesFailed++;
+ VerifyFolderStats.FilesFailed++;
});
}
Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(IsAborted, PendingWork);
std::string Details = fmt::format("Verified {}/{} ({}). Failed files: {}",
- FilesVerified.load(),
+ VerifyFolderStats.FilesVerified.load(),
PathCount,
- NiceBytes(ReadBytes.load()),
- FilesFailed.load());
+ NiceBytes(VerifyFolderStats.ReadBytes.load()),
+ VerifyFolderStats.FilesFailed.load());
ProgressBar.UpdateState({.Task = "Verifying files ",
.Details = Details,
.TotalCount = gsl::narrow<uint64_t>(PathCount),
- .RemainingCount = gsl::narrow<uint64_t>(PathCount - FilesVerified.load())},
+ .RemainingCount = gsl::narrow<uint64_t>(PathCount - VerifyFolderStats.FilesVerified.load())},
false);
});
+ VerifyFolderStats.VerifyElapsedWallTimeUs = Timer.GetElapsedTimeUs();
+
ProgressBar.Finish();
for (const std::string& Error : Errors)
{
@@ -3605,7 +3782,7 @@ namespace {
class WriteFileCache
{
public:
- WriteFileCache() {}
+ WriteFileCache(DiskStatistics& DiskStats) : m_DiskStats(DiskStats) {}
~WriteFileCache() { Flush(); }
template<typename TBufferType>
@@ -3621,6 +3798,8 @@ namespace {
ZEN_TRACE_CPU("WriteFileCache_WriteToFile_CacheWrite");
ZEN_ASSERT(OpenFileWriter);
OpenFileWriter->Write(Buffer, FileOffset);
+ m_DiskStats.WriteCount++;
+ m_DiskStats.WriteByteCount += Buffer.GetSize();
}
else
{
@@ -3643,6 +3822,8 @@ namespace {
}
return --Tries > 0;
});
+ m_DiskStats.OpenWriteCount++;
+ m_DiskStats.CurrentOpenFileCount++;
}
const bool CacheWriter = TargetFinalSize > Buffer.GetSize();
@@ -3654,12 +3835,18 @@ namespace {
OutputFile = std::move(NewOutputFile);
OpenFileWriter = std::make_unique<BasicFileWriter>(*OutputFile, Min(TargetFinalSize, 256u * 1024u));
OpenFileWriter->Write(Buffer, FileOffset);
+ m_DiskStats.WriteCount++;
+ m_DiskStats.WriteByteCount += Buffer.GetSize();
SeenTargetIndexes.push_back(TargetIndex);
}
else
{
ZEN_TRACE_CPU("WriteFileCache_WriteToFile_Write");
NewOutputFile->Write(Buffer, FileOffset);
+ m_DiskStats.WriteCount++;
+ m_DiskStats.WriteByteCount += Buffer.GetSize();
+ NewOutputFile = {};
+ m_DiskStats.CurrentOpenFileCount--;
}
}
}
@@ -3667,9 +3854,15 @@ namespace {
void Flush()
{
ZEN_TRACE_CPU("WriteFileCache_Flush");
+ if (OutputFile)
+ {
+ m_DiskStats.CurrentOpenFileCount--;
+ }
+
OpenFileWriter = {};
OutputFile = {};
}
+ DiskStatistics& m_DiskStats;
std::vector<uint32_t> SeenTargetIndexes;
std::unique_ptr<BasicFile> OutputFile;
std::unique_ptr<BasicFileWriter> OpenFileWriter;
@@ -3696,6 +3889,100 @@ namespace {
return ChunkTargetPtrs;
};
+ void FinalizeChunkSequence(const std::filesystem::path& TargetFolder, const IoHash& SequenceRawHash)
+ {
+ ZEN_TRACE_CPU("FinalizeChunkSequence");
+ ZEN_ASSERT_SLOW(!std::filesystem::exists(GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
+ std::filesystem::rename(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash),
+ GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash));
+ }
+
+ void FinalizeChunkSequences(const std::filesystem::path& TargetFolder,
+ const ChunkedFolderContent& RemoteContent,
+ std::span<const uint32_t> RemoteSequenceIndexes)
+ {
+ ZEN_TRACE_CPU("FinalizeChunkSequences");
+ for (uint32_t SequenceIndex : RemoteSequenceIndexes)
+ {
+ FinalizeChunkSequence(TargetFolder, RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
+ }
+ }
+
+ void VerifyAndCompleteChunkSequencesAsync(const std::filesystem::path& TargetFolder,
+ const ChunkedFolderContent& RemoteContent,
+ std::span<const uint32_t> RemoteSequenceIndexes,
+ ParallellWork& Work,
+ WorkerThreadPool& VerifyPool)
+ {
+ if (RemoteSequenceIndexes.empty())
+ {
+ return;
+ }
+ ZEN_TRACE_CPU("VerifyAndCompleteChunkSequence");
+ for (uint32_t RemoteSequenceIndexOffset = 1; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++)
+ {
+ const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset];
+ Work.ScheduleWork(
+ VerifyPool,
+ [&RemoteContent, TargetFolder, RemoteSequenceIndex](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("VerifyAndCompleteChunkSequenceAsync");
+ const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ {
+ ZEN_TRACE_CPU("HashSequence");
+ const IoHash VerifyChunkHash = IoHash::HashBuffer(
+ IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
+ if (VerifyChunkHash != SequenceRawHash)
+ {
+ throw std::runtime_error(fmt::format("Written chunk sequence {} hash does not match expected hash {}",
+ VerifyChunkHash,
+ SequenceRawHash));
+ }
+ }
+ FinalizeChunkSequence(TargetFolder, SequenceRawHash);
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+ const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0];
+
+ const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ {
+ ZEN_TRACE_CPU("HashSequence");
+ const IoHash VerifyChunkHash =
+ IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
+ if (VerifyChunkHash != SequenceRawHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash));
+ }
+ }
+ FinalizeChunkSequence(TargetFolder, SequenceRawHash);
+ }
+
+ bool CompleteSequenceChunk(uint32_t RemoteSequenceIndex, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters)
+ {
+ return SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1;
+ }
+
+ std::vector<uint32_t> CompleteChunkTargets(const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters)
+ {
+ ZEN_TRACE_CPU("CompleteChunkTargets");
+
+ std::vector<uint32_t> CompletedSequenceIndexes;
+ for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs)
+ {
+ const uint32_t RemoteSequenceIndex = Location->SequenceIndex;
+ if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
+ {
+ CompletedSequenceIndexes.push_back(RemoteSequenceIndex);
+ }
+ }
+ return CompletedSequenceIndexes;
+ }
+
struct BlockWriteOps
{
std::vector<CompositeBuffer> ChunkBuffers;
@@ -3712,12 +3999,14 @@ namespace {
const ChunkedContentLookup& Lookup,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
const BlockWriteOps& Ops,
- std::atomic<uint32_t>& OutChunksComplete,
- std::atomic<uint64_t>& OutBytesWritten)
+ ParallellWork& Work,
+ WorkerThreadPool& VerifyPool,
+ DiskStatistics& DiskStats,
+ WriteChunkStatistics& WriteChunkStats)
{
ZEN_TRACE_CPU("WriteBlockChunkOps");
{
- WriteFileCache OpenFileCache;
+ WriteFileCache OpenFileCache(DiskStats);
for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps)
{
if (AbortFlag)
@@ -3743,36 +4032,76 @@ namespace {
Chunk,
FileOffset,
RemoteContent.RawSizes[PathIndex]);
- OutBytesWritten += ChunkSize;
}
+ WriteChunkStats.ChunkCountWritten += gsl::narrow<uint32_t>(Ops.ChunkBuffers.size());
+ WriteChunkStats.ChunkBytesWritten +=
+ std::accumulate(Ops.ChunkBuffers.begin(),
+ Ops.ChunkBuffers.end(),
+ uint64_t(0),
+ [](uint64_t Current, const CompositeBuffer& Buffer) -> uint64_t { return Current + Buffer.GetSize(); });
}
if (!AbortFlag)
{
// Write tracking, updating this must be done without any files open (WriteFileCache)
+ std::vector<uint32_t> CompletedChunkSequences;
for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps)
{
const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex;
- if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
+ if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
{
- const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- {
- ZEN_TRACE_CPU("VerifyChunkHash");
- const IoHash VerifyChunkHash = IoHash::HashBuffer(
- IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
- if (VerifyChunkHash != SequenceRawHash)
- {
- throw std::runtime_error(fmt::format("Written chunk sequence {} hash does not match expected hash {}",
- VerifyChunkHash,
- SequenceRawHash));
- }
- }
- ZEN_TRACE_CPU("VerifyChunkHashes_rename");
- ZEN_ASSERT_SLOW(!std::filesystem::exists(GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
- std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
- GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
+ CompletedChunkSequences.push_back(RemoteSequenceIndex);
+ }
+ }
+ VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, CompletedChunkSequences, Work, VerifyPool);
+ }
+ }
+
+ IoBuffer MakeBufferMemoryBased(const CompositeBuffer& PartialBlockBuffer)
+ {
+ ZEN_TRACE_CPU("MakeBufferMemoryBased");
+ IoBuffer BlockMemoryBuffer;
+ std::span<const SharedBuffer> Segments = PartialBlockBuffer.GetSegments();
+ if (Segments.size() == 1)
+ {
+ IoBufferFileReference FileRef = {};
+ if (PartialBlockBuffer.GetSegments().front().AsIoBuffer().GetFileReference(FileRef))
+ {
+ BlockMemoryBuffer = UniqueBuffer::Alloc(FileRef.FileChunkSize).MoveToShared().AsIoBuffer();
+ BasicFile Reader;
+ Reader.Attach(FileRef.FileHandle);
+ auto _ = MakeGuard([&Reader]() { Reader.Detach(); });
+ MutableMemoryView ReadMem = BlockMemoryBuffer.GetMutableView();
+ Reader.Read(ReadMem.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset);
+ return BlockMemoryBuffer;
+ }
+ else
+ {
+ return PartialBlockBuffer.GetSegments().front().AsIoBuffer();
+ }
+ }
+ else
+ {
+ // Not a homogenous memory buffer, read all to memory
+
+ BlockMemoryBuffer = UniqueBuffer::Alloc(PartialBlockBuffer.GetSize()).MoveToShared().AsIoBuffer();
+ MutableMemoryView ReadMem = BlockMemoryBuffer.GetMutableView();
+ for (const SharedBuffer& Segment : Segments)
+ {
+ IoBufferFileReference FileRef = {};
+ if (Segment.AsIoBuffer().GetFileReference(FileRef))
+ {
+ BasicFile Reader;
+ Reader.Attach(FileRef.FileHandle);
+ Reader.Read(ReadMem.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset);
+ Reader.Detach();
+ ReadMem = ReadMem.Mid(FileRef.FileChunkSize);
+ }
+ else
+ {
+ ReadMem = ReadMem.CopyFrom(Segment.AsIoBuffer().GetView());
}
}
- OutChunksComplete += gsl::narrow<uint32_t>(Ops.ChunkBuffers.size());
+ return BlockMemoryBuffer;
}
}
@@ -3780,31 +4109,14 @@ namespace {
const ChunkedContentLookup& Lookup,
std::span<const IoHash> ChunkRawHashes,
std::span<const uint32_t> ChunkCompressedLengths,
- std::span<const uint32_t> ChunkRawLengths,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- CompositeBuffer&& PartialBlockBuffer,
+ const MemoryView BlockView,
uint32_t FirstIncludedBlockChunkIndex,
uint32_t LastIncludedBlockChunkIndex,
BlockWriteOps& OutOps)
{
ZEN_TRACE_CPU("GetBlockWriteOps");
- MemoryView BlockMemoryView;
- UniqueBuffer BlockMemoryBuffer;
- IoBufferFileReference FileRef = {};
- if (PartialBlockBuffer.GetSegments().size() == 1 && PartialBlockBuffer.GetSegments()[0].AsIoBuffer().GetFileReference(FileRef))
- {
- BlockMemoryBuffer = UniqueBuffer::Alloc(FileRef.FileChunkSize);
- BasicFile Reader;
- Reader.Attach(FileRef.FileHandle);
- Reader.Read(BlockMemoryBuffer.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset);
- BlockMemoryView = BlockMemoryBuffer.GetView();
- Reader.Detach();
- }
- else
- {
- BlockMemoryView = PartialBlockBuffer.ViewOrCopyRange(0, PartialBlockBuffer.GetSize(), BlockMemoryBuffer);
- }
uint32_t OffsetInBlock = 0;
for (uint32_t ChunkBlockIndex = FirstIncludedBlockChunkIndex; ChunkBlockIndex <= LastIncludedBlockChunkIndex; ChunkBlockIndex++)
{
@@ -3821,32 +4133,9 @@ namespace {
bool NeedsWrite = true;
if (RemoteChunkIndexNeedsCopyFromSourceFlags[ChunkIndex].compare_exchange_strong(NeedsWrite, false))
{
- // CompositeBuffer Chunk = PartialBlockBuffer.Mid(OffsetInBlock, ChunkCompressedSize);
- MemoryView ChunkMemory = BlockMemoryView.Mid(OffsetInBlock, ChunkCompressedSize);
- CompositeBuffer Chunk = CompositeBuffer(IoBuffer(IoBuffer::Wrap, ChunkMemory.GetData(), ChunkMemory.GetSize()));
- IoHash VerifyChunkHash;
- uint64_t VerifyRawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Chunk, VerifyChunkHash, VerifyRawSize);
- if (!Compressed)
- {
- ZEN_ASSERT(false);
- }
- if (VerifyChunkHash != ChunkHash)
- {
- ZEN_ASSERT(false);
- }
- if (!ChunkRawLengths.empty())
- {
- if (VerifyRawSize != ChunkRawLengths[ChunkBlockIndex])
- {
- ZEN_ASSERT(false);
- }
- }
- CompositeBuffer Decompressed = Compressed.DecompressToComposite();
- if (!Decompressed)
- {
- throw std::runtime_error(fmt::format("Decompression of build blob {} failed", ChunkHash));
- }
+ MemoryView ChunkMemoryView = BlockView.Mid(OffsetInBlock + CompressedBuffer::GetHeaderSizeForNoneEncoder(),
+ ChunkCompressedSize - CompressedBuffer::GetHeaderSizeForNoneEncoder());
+ IoBuffer Decompressed(IoBuffer::Wrap, ChunkMemoryView.GetData(), ChunkMemoryView.GetSize());
ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed));
ZEN_ASSERT(Decompressed.GetSize() == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs)
@@ -3861,19 +4150,22 @@ namespace {
OffsetInBlock += ChunkCompressedSize;
}
- std::sort(OutOps.WriteOps.begin(),
- OutOps.WriteOps.end(),
- [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) {
- if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
- {
- return true;
- }
- if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
- {
- return false;
- }
- return Lhs.Target->Offset < Rhs.Target->Offset;
- });
+ {
+ ZEN_TRACE_CPU("GetBlockWriteOps_sort");
+ std::sort(OutOps.WriteOps.begin(),
+ OutOps.WriteOps.end(),
+ [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) {
+ if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
+ {
+ return true;
+ }
+ if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
+ {
+ return false;
+ }
+ return Lhs.Target->Offset < Rhs.Target->Offset;
+ });
+ }
return true;
}
@@ -3881,34 +4173,35 @@ namespace {
const ChunkedFolderContent& RemoteContent,
const ChunkBlockDescription& BlockDescription,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ ParallellWork& Work,
+ WorkerThreadPool& VerifyPool,
CompositeBuffer&& BlockBuffer,
const ChunkedContentLookup& Lookup,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- std::atomic<uint32_t>& OutChunksComplete,
- std::atomic<uint64_t>& OutBytesWritten)
+ DiskStatistics& DiskStats,
+ WriteChunkStatistics& WriteChunkStats)
{
ZEN_TRACE_CPU("WriteBlockToDisk");
+ IoBuffer BlockMemoryBuffer = MakeBufferMemoryBased(BlockBuffer);
+ const MemoryView BlockView = BlockMemoryBuffer.GetView();
+
BlockWriteOps Ops;
if ((BlockDescription.HeaderSize == 0) || BlockDescription.ChunkCompressedLengths.empty())
{
ZEN_TRACE_CPU("WriteBlockToDisk_Legacy");
- UniqueBuffer CopyBuffer;
- const MemoryView BlockView = BlockBuffer.ViewOrCopyRange(0, BlockBuffer.GetSize(), CopyBuffer);
uint64_t HeaderSize;
- const std::vector<uint32_t> ChunkCompressedLengths = ReadChunkBlockHeader(BlockView, HeaderSize);
-
- CompositeBuffer PartialBlockBuffer = std::move(BlockBuffer).Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + HeaderSize);
+ const std::vector<uint32_t> ChunkCompressedLengths =
+ ReadChunkBlockHeader(BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder()), HeaderSize);
if (GetBlockWriteOps(RemoteContent,
Lookup,
BlockDescription.ChunkRawHashes,
ChunkCompressedLengths,
- BlockDescription.ChunkRawLengths,
SequenceIndexChunksLeftToWriteCounters,
RemoteChunkIndexNeedsCopyFromSourceFlags,
- std::move(PartialBlockBuffer),
+ BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + HeaderSize),
0,
gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1),
Ops))
@@ -3918,23 +4211,22 @@ namespace {
Lookup,
SequenceIndexChunksLeftToWriteCounters,
Ops,
- OutChunksComplete,
- OutBytesWritten);
+ Work,
+ VerifyPool,
+ DiskStats,
+ WriteChunkStats);
return true;
}
return false;
}
- CompositeBuffer PartialBlockBuffer =
- std::move(BlockBuffer).Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
if (GetBlockWriteOps(RemoteContent,
Lookup,
BlockDescription.ChunkRawHashes,
BlockDescription.ChunkCompressedLengths,
- BlockDescription.ChunkRawLengths,
SequenceIndexChunksLeftToWriteCounters,
RemoteChunkIndexNeedsCopyFromSourceFlags,
- std::move(PartialBlockBuffer),
+ BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize),
0,
gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1),
Ops))
@@ -3944,8 +4236,10 @@ namespace {
Lookup,
SequenceIndexChunksLeftToWriteCounters,
Ops,
- OutChunksComplete,
- OutBytesWritten);
+ Work,
+ VerifyPool,
+ DiskStats,
+ WriteChunkStats);
return true;
}
return false;
@@ -3955,24 +4249,29 @@ namespace {
const ChunkedFolderContent& RemoteContent,
const ChunkBlockDescription& BlockDescription,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ ParallellWork& Work,
+ WorkerThreadPool& VerifyPool,
CompositeBuffer&& PartialBlockBuffer,
uint32_t FirstIncludedBlockChunkIndex,
uint32_t LastIncludedBlockChunkIndex,
const ChunkedContentLookup& Lookup,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- std::atomic<uint32_t>& OutChunksComplete,
- std::atomic<uint64_t>& OutBytesWritten)
+ DiskStatistics& DiskStats,
+ WriteChunkStatistics& WriteChunkStats)
{
ZEN_TRACE_CPU("WritePartialBlockToDisk");
+
+ IoBuffer BlockMemoryBuffer = MakeBufferMemoryBased(PartialBlockBuffer);
+ const MemoryView BlockView = BlockMemoryBuffer.GetView();
+
BlockWriteOps Ops;
if (GetBlockWriteOps(RemoteContent,
Lookup,
BlockDescription.ChunkRawHashes,
BlockDescription.ChunkCompressedLengths,
- BlockDescription.ChunkRawLengths,
SequenceIndexChunksLeftToWriteCounters,
RemoteChunkIndexNeedsCopyFromSourceFlags,
- std::move(PartialBlockBuffer),
+ BlockView,
FirstIncludedBlockChunkIndex,
LastIncludedBlockChunkIndex,
Ops))
@@ -3982,8 +4281,10 @@ namespace {
Lookup,
SequenceIndexChunksLeftToWriteCounters,
Ops,
- OutChunksComplete,
- OutBytesWritten);
+ Work,
+ VerifyPool,
+ DiskStats,
+ WriteChunkStats);
return true;
}
else
@@ -4031,8 +4332,7 @@ namespace {
const ChunkedContentLookup& Lookup,
std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> ChunkTargets,
CompositeBuffer&& ChunkData,
- WriteFileCache& OpenFileCache,
- std::atomic<uint64_t>& OutBytesWritten)
+ WriteFileCache& OpenFileCache)
{
ZEN_TRACE_CPU("WriteChunkToDisk");
@@ -4051,7 +4351,6 @@ namespace {
ChunkData,
FileOffset,
Content.RawSizes[PathIndex]);
- OutBytesWritten += ChunkData.GetSize();
}
}
@@ -4072,7 +4371,8 @@ namespace {
void StreamDecompress(const std::filesystem::path& CacheFolderPath,
const IoHash& SequenceRawHash,
CompositeBuffer&& CompressedPart,
- std::atomic<uint64_t>& WriteToDiskBytes)
+ DiskStatistics& DiskStats,
+ WriteChunkStatistics& WriteChunkStats)
{
ZEN_TRACE_CPU("StreamDecompress");
const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash);
@@ -4095,21 +4395,29 @@ namespace {
{
throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash));
}
+
IoHashStream Hash;
- bool CouldDecompress = Compressed.DecompressToStream(0, (uint64_t)-1, [&](uint64_t Offset, const CompositeBuffer& RangeBuffer) {
- ZEN_TRACE_CPU("StreamDecompress_Write");
- if (!AbortFlag)
- {
- DecompressedTemp.Write(RangeBuffer, Offset);
- for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ bool CouldDecompress = Compressed.DecompressToStream(
+ 0,
+ (uint64_t)-1,
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset);
+ ZEN_TRACE_CPU("StreamDecompress_Write");
+ DiskStats.ReadByteCount += SourceSize;
+ if (!AbortFlag)
{
- Hash.Append(Segment.GetView());
+ WriteChunkStats.ChunkBytesWritten += RangeBuffer.GetSize();
+ DecompressedTemp.Write(RangeBuffer, Offset);
+ for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ {
+ Hash.Append(Segment.GetView());
+ }
+ DiskStats.WriteByteCount += RangeBuffer.GetSize();
+ DiskStats.WriteCount++;
+ return true;
}
- WriteToDiskBytes += RangeBuffer.GetSize();
- return true;
- }
- return false;
- });
+ return false;
+ });
if (AbortFlag)
{
@@ -4132,6 +4440,7 @@ namespace {
throw std::runtime_error(
fmt::format("Failed moving temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message()));
}
+ WriteChunkStats.ChunkCountWritten++;
}
bool WriteCompressedChunk(const std::filesystem::path& TargetFolder,
@@ -4140,74 +4449,40 @@ namespace {
const IoHash& ChunkHash,
const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
IoBuffer&& CompressedPart,
- std::atomic<uint64_t>& WriteToDiskBytes)
+ DiskStatistics& DiskStats,
+ WriteChunkStatistics& WriteChunkStats)
{
auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end());
+ const uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second;
+ const uint64_t ChunkRawSize = RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex];
if (CanDecompressDirectToSequence(RemoteContent, ChunkTargetPtrs))
{
- const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[ChunkTargetPtrs.front()->SequenceIndex];
- StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), WriteToDiskBytes);
+ const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex;
+ const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex];
+ StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), DiskStats, WriteChunkStats);
}
else
{
- const uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second;
- SharedBuffer Chunk =
- Decompress(CompositeBuffer(std::move(CompressedPart)), ChunkHash, RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+ SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)), ChunkHash, ChunkRawSize);
if (!AbortFlag)
{
- WriteFileCache OpenFileCache;
-
+ WriteFileCache OpenFileCache(DiskStats);
WriteChunkToDisk(TargetFolder,
RemoteContent,
RemoteLookup,
ChunkTargetPtrs,
CompositeBuffer(std::move(Chunk)),
- OpenFileCache,
- WriteToDiskBytes);
+ OpenFileCache);
+ WriteChunkStats.ChunkCountWritten++;
+ WriteChunkStats.ChunkBytesWritten += ChunkRawSize;
return true;
}
}
return false;
}
- void CompleteChunkTargets(const std::filesystem::path& TargetFolder,
- const ChunkedFolderContent& RemoteContent,
- const IoHash& ChunkHash,
- const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
- std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
- const bool NeedHashVerify)
- {
- ZEN_TRACE_CPU("CompleteChunkTargets");
-
- for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs)
- {
- const uint32_t RemoteSequenceIndex = Location->SequenceIndex;
- if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
- {
- const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- if (NeedHashVerify)
- {
- ZEN_TRACE_CPU("VerifyChunkHash");
-
- const IoHash VerifyChunkHash =
- IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
- if (VerifyChunkHash != ChunkHash)
- {
- throw std::runtime_error(
- fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, ChunkHash));
- }
- }
-
- ZEN_TRACE_CPU("RenameToFinal");
- ZEN_ASSERT_SLOW(!std::filesystem::exists(GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
- std::filesystem::rename(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash),
- GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash));
- }
- }
- }
-
void AsyncWriteDownloadedChunk(const std::filesystem::path& Path,
const ChunkedFolderContent& RemoteContent,
const ChunkedContentLookup& RemoteLookup,
@@ -4217,19 +4492,17 @@ namespace {
WorkerThreadPool& WritePool,
IoBuffer&& Payload,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
- std::atomic<uint64_t>& WriteToDiskBytes,
- std::atomic<uint32_t>& ChunkCountWritten,
std::atomic<uint64_t>& WritePartsComplete,
- std::atomic<uint64_t>& TotalPartWriteCount,
- std::atomic<uint64_t>& LooseChunksBytes,
- FilteredRate& FilteredWrittenBytesPerSecond)
+ const uint64_t TotalPartWriteCount,
+ FilteredRate& FilteredWrittenBytesPerSecond,
+ DiskStatistics& DiskStats,
+ WriteChunkStatistics& WriteChunkStats)
{
ZEN_TRACE_CPU("AsyncWriteDownloadedChunk");
const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
- uint64_t Size = Payload.GetSize();
- LooseChunksBytes += Size;
+ const uint64_t Size = Payload.GetSize();
std::filesystem::path CompressedChunkPath;
@@ -4275,6 +4548,7 @@ namespace {
SequenceIndexChunksLeftToWriteCounters,
CompressedChunkPath,
RemoteChunkIndex,
+ TotalPartWriteCount,
ChunkTargetPtrs = std::move(ChunkTargetPtrs),
CompressedPart = std::move(Payload)](std::atomic<bool>&) mutable {
ZEN_TRACE_CPU("UpdateFolder_WriteChunk");
@@ -4307,11 +4581,10 @@ namespace {
ChunkHash,
ChunkTargetPtrs,
std::move(CompressedPart),
- WriteToDiskBytes);
-
+ DiskStats,
+ WriteChunkStats);
if (!AbortFlag)
{
- ChunkCountWritten++;
WritePartsComplete++;
if (WritePartsComplete == TotalPartWriteCount)
{
@@ -4320,12 +4593,16 @@ namespace {
std::filesystem::remove(CompressedChunkPath);
- CompleteChunkTargets(TargetFolder,
- RemoteContent,
- ChunkHash,
- ChunkTargetPtrs,
- SequenceIndexChunksLeftToWriteCounters,
- NeedHashVerify);
+ std::vector<uint32_t> CompletedSequences =
+ CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
+ if (NeedHashVerify)
+ {
+ VerifyAndCompleteChunkSequencesAsync(TargetFolder, RemoteContent, CompletedSequences, Work, WritePool);
+ }
+ else
+ {
+ FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences);
+ }
}
}
},
@@ -4343,19 +4620,16 @@ namespace {
const std::vector<IoHash>& LooseChunkHashes,
bool AllowPartialBlockRequests,
bool WipeTargetFolder,
- FolderContent& OutLocalFolderState)
+ FolderContent& OutLocalFolderState,
+ DiskStatistics& DiskStats,
+ CacheMappingStatistics& CacheMappingStats,
+ DownloadStatistics& DownloadStats,
+ WriteChunkStatistics& WriteChunkStats,
+ RebuildFolderStateStatistics& RebuildFolderStateStats)
{
ZEN_TRACE_CPU("UpdateFolder");
ZEN_UNUSED(WipeTargetFolder);
- std::atomic<uint64_t> DownloadedBlocks = 0;
- std::atomic<uint64_t> BlockBytes = 0;
- std::atomic<uint64_t> DownloadedChunks = 0;
- std::atomic<uint64_t> LooseChunksBytes = 0;
- std::atomic<uint64_t> WriteToDiskBytes = 0;
- std::atomic<uint64_t> MultipartAttachmentCount = 0;
-
- DiskStatistics DiskStats;
Stopwatch IndexTimer;
@@ -4370,15 +4644,11 @@ namespace {
Stopwatch CacheMappingTimer;
std::vector<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters(RemoteContent.ChunkedContent.SequenceRawHashes.size());
- // std::vector<bool> RemoteSequenceIndexIsCachedFlags(RemoteContent.ChunkedContent.SequenceRawHashes.size(), false);
- std::vector<bool> RemoteChunkIndexNeedsCopyFromLocalFileFlags(RemoteContent.ChunkedContent.ChunkHashes.size());
- // Guard if he same chunks is in multiple blocks (can happen due to block reuse, cache reuse blocks writes directly)
- std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(RemoteContent.ChunkedContent.ChunkHashes.size());
+ std::vector<bool> RemoteChunkIndexNeedsCopyFromLocalFileFlags(RemoteContent.ChunkedContent.ChunkHashes.size());
+ std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(RemoteContent.ChunkedContent.ChunkHashes.size());
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedChunkHashesFound;
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedSequenceHashesFound;
- uint64_t CachedChunkHashesByteCountFound = 0;
- uint64_t CachedSequenceHashesByteCountFound = 0;
{
ZEN_TRACE_CPU("UpdateFolder_CheckChunkCache");
@@ -4399,7 +4669,8 @@ namespace {
if (ChunkSize == CacheDirContent.FileSizes[Index])
{
CachedChunkHashesFound.insert({FileHash, ChunkIndex});
- CachedChunkHashesByteCountFound += ChunkSize;
+ CacheMappingStats.CacheChunkCount++;
+ CacheMappingStats.CacheChunkByteCount += ChunkSize;
continue;
}
}
@@ -4412,7 +4683,8 @@ namespace {
if (SequenceSize == CacheDirContent.FileSizes[Index])
{
CachedSequenceHashesFound.insert({FileHash, SequenceIndex});
- CachedSequenceHashesByteCountFound += SequenceSize;
+ CacheMappingStats.CacheSequenceHashesCount += SequenceSize;
+ CacheMappingStats.CacheSequenceHashesByteCount++;
continue;
}
}
@@ -4422,7 +4694,6 @@ namespace {
}
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedBlocksFound;
- uint64_t CachedBlocksByteCountFound = 0;
{
ZEN_TRACE_CPU("UpdateFolder_CheckBlockCache");
@@ -4457,7 +4728,8 @@ namespace {
if (BlockSize == BlockDirContent.FileSizes[Index])
{
CachedBlocksFound.insert({FileHash, BlockIndex});
- CachedBlocksByteCountFound += BlockSize;
+ CacheMappingStats.CacheBlockCount++;
+ CacheMappingStats.CacheBlocksByteCount += BlockSize;
continue;
}
}
@@ -4467,7 +4739,6 @@ namespace {
}
std::vector<uint32_t> LocalPathIndexesMatchingSequenceIndexes;
- uint64_t LocalPathIndexesByteCountMatchingSequenceIndexes = 0;
// Pick up all whole files we can use from current local state
{
ZEN_TRACE_CPU("UpdateFolder_CheckLocalChunks");
@@ -4496,13 +4767,14 @@ namespace {
const uint32_t LocalPathIndex = GetFirstPathIndexForSeqeuenceIndex(LocalLookup, LocalSequenceIndex);
uint64_t RawSize = LocalContent.RawSizes[LocalPathIndex];
LocalPathIndexesMatchingSequenceIndexes.push_back(LocalPathIndex);
- LocalPathIndexesByteCountMatchingSequenceIndexes += RawSize;
+ CacheMappingStats.LocalPathsMatchingSequencesCount++;
+ CacheMappingStats.LocalPathsMatchingSequencesByteCount += RawSize;
}
else
{
// We must write the sequence
- SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] =
- RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex];
+ const uint32_t ChunkCount = RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex];
+ SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount;
}
}
}
@@ -4514,7 +4786,7 @@ namespace {
struct ChunkTarget
{
uint32_t TargetChunkLocationCount = (uint32_t)-1;
- uint64_t ChunkRawSize = (uint64_t)-1;
+ uint32_t RemoteChunkIndex = (uint32_t)-1;
uint64_t CacheFileOffset = (uint64_t)-1;
};
std::vector<ChunkTarget> ChunkTargets;
@@ -4522,8 +4794,6 @@ namespace {
tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCacheCopyDataIndex;
std::vector<CacheCopyData> CacheCopyDatas;
- uint64_t LocalChunkHashesMatchingRemoteCount = 0;
- uint64_t LocalChunkHashesMatchingRemoteByteCount = 0;
{
ZEN_TRACE_CPU("UpdateFolder_GetLocalChunks");
@@ -4556,7 +4826,7 @@ namespace {
{
CacheCopyData::ChunkTarget Target = {
.TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()),
- .ChunkRawSize = LocalChunkRawSize,
+ .RemoteChunkIndex = RemoteChunkIndex,
.CacheFileOffset = SourceOffset};
if (auto CopySourceIt = RawHashToCacheCopyDataIndex.find(LocalSequenceRawHash);
CopySourceIt != RawHashToCacheCopyDataIndex.end())
@@ -4586,8 +4856,8 @@ namespace {
.TargetChunkLocationPtrs = ChunkTargetPtrs,
.ChunkTargets = std::vector<CacheCopyData::ChunkTarget>{Target}});
}
- LocalChunkHashesMatchingRemoteByteCount += LocalChunkRawSize;
- LocalChunkHashesMatchingRemoteCount++;
+ CacheMappingStats.LocalChunkMatchingRemoteCount++;
+ CacheMappingStats.LocalChunkMatchingRemoteByteCount += LocalChunkRawSize;
RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true;
}
}
@@ -4599,20 +4869,20 @@ namespace {
}
if (!CachedSequenceHashesFound.empty() || !CachedChunkHashesFound.empty() || !CachedBlocksFound.empty() ||
- !LocalPathIndexesMatchingSequenceIndexes.empty() || LocalChunkHashesMatchingRemoteCount > 0)
+ !LocalPathIndexesMatchingSequenceIndexes.empty() || CacheMappingStats.LocalChunkMatchingRemoteCount > 0)
{
ZEN_CONSOLE(
"Cache: {} ({}) chunk sequences, {} ({}) chunks, {} ({}) blocks. Local state: {} ({}) chunk sequences, {} ({}) chunks",
CachedSequenceHashesFound.size(),
- NiceBytes(CachedSequenceHashesByteCountFound),
+ NiceBytes(CacheMappingStats.CacheSequenceHashesByteCount),
CachedChunkHashesFound.size(),
- NiceBytes(CachedChunkHashesByteCountFound),
+ NiceBytes(CacheMappingStats.CacheChunkByteCount),
CachedBlocksFound.size(),
- NiceBytes(CachedBlocksByteCountFound),
+ NiceBytes(CacheMappingStats.CacheBlocksByteCount),
LocalPathIndexesMatchingSequenceIndexes.size(),
- NiceBytes(LocalPathIndexesByteCountMatchingSequenceIndexes),
- LocalChunkHashesMatchingRemoteCount,
- NiceBytes(LocalChunkHashesMatchingRemoteByteCount));
+ NiceBytes(CacheMappingStats.LocalPathsMatchingSequencesByteCount),
+ CacheMappingStats.LocalChunkMatchingRemoteCount,
+ NiceBytes(CacheMappingStats.LocalChunkMatchingRemoteByteCount));
}
uint32_t ChunkCountToWrite = 0;
@@ -4635,9 +4905,7 @@ namespace {
}
uint64_t TotalRequestCount = 0;
- std::atomic<uint64_t> RequestsComplete = 0;
- std::atomic<uint32_t> ChunkCountWritten = 0;
- std::atomic<uint64_t> TotalPartWriteCount = 0;
+ uint64_t TotalPartWriteCount = 0;
std::atomic<uint64_t> WritePartsComplete = 0;
{
@@ -4654,8 +4922,6 @@ namespace {
ProgressBar WriteProgressBar(UsePlainProgress);
ParallellWork Work(AbortFlag);
- std::atomic<uint64_t> BytesDownloaded = 0;
-
struct LooseChunkHashWorkData
{
std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs;
@@ -4737,12 +5003,6 @@ namespace {
std::vector<uint32_t> FullBlockWorks;
- size_t BlocksNeededCount = 0;
- uint64_t AllBlocksSize = 0;
- uint64_t AllBlocksFetch = 0;
- uint64_t AllBlocksSlack = 0;
- uint64_t AllBlockRequests = 0;
- uint64_t AllBlockChunksSize = 0;
for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++)
{
const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
@@ -4798,7 +5058,6 @@ namespace {
}
else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
{
- AllBlockChunksSize += ChunkCompressedLength;
if (NextRange.RangeLength == 0)
{
NextRange.RangeStart = CurrentOffset;
@@ -4815,7 +5074,6 @@ namespace {
ZEN_ASSERT(false);
}
}
- AllBlocksSize += CurrentOffset;
if (NextRange.RangeLength > 0)
{
BlockRanges.push_back(NextRange);
@@ -4825,7 +5083,6 @@ namespace {
std::vector<BlockRangeDescriptor> CollapsedBlockRanges;
auto It = BlockRanges.begin();
CollapsedBlockRanges.push_back(*It++);
- uint64_t TotalSlack = 0;
while (It != BlockRanges.end())
{
BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back();
@@ -4836,7 +5093,6 @@ namespace {
LastRange.ChunkBlockIndexCount =
(It->ChunkBlockIndexStart + It->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart;
LastRange.RangeLength = (It->RangeStart + It->RangeLength) - LastRange.RangeStart;
- TotalSlack += Slack;
}
else
{
@@ -4845,17 +5101,6 @@ namespace {
++It;
}
- uint64_t TotalFetch = 0;
- for (const BlockRangeDescriptor& Range : CollapsedBlockRanges)
- {
- TotalFetch += Range.RangeLength;
- }
-
- AllBlocksFetch += TotalFetch;
- AllBlocksSlack += TotalSlack;
- BlocksNeededCount++;
- AllBlockRequests += CollapsedBlockRanges.size();
-
TotalRequestCount += CollapsedBlockRanges.size();
TotalPartWriteCount += CollapsedBlockRanges.size();
@@ -4863,7 +5108,6 @@ namespace {
}
else
{
- BlocksNeededCount++;
TotalRequestCount++;
TotalPartWriteCount++;
@@ -4877,154 +5121,6 @@ namespace {
}
}
- for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++)
- {
- if (AbortFlag)
- {
- break;
- }
-
- Work.ScheduleWork(
- WritePool, // GetSyncWorkerPool(),//
- [&, CopyDataIndex](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_CopyLocal");
-
- FilteredWrittenBytesPerSecond.Start();
- const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex];
- const uint32_t LocalPathIndex = LocalLookup.SequenceIndexFirstPathIndex[CopyData.LocalSequenceIndex];
- const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
- ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty());
-
- uint64_t CacheLocalFileBytesRead = 0;
-
- size_t TargetStart = 0;
- const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets(
- CopyData.TargetChunkLocationPtrs);
-
- struct WriteOp
- {
- const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr;
- uint64_t CacheFileOffset = (uint64_t)-1;
- uint64_t ChunkSize = (uint64_t)-1;
- };
-
- std::vector<WriteOp> WriteOps;
-
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("Sort");
- WriteOps.reserve(AllTargets.size());
- for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets)
- {
- std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange =
- AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount);
- for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange)
- {
- WriteOps.push_back(WriteOp{.Target = Target,
- .CacheFileOffset = ChunkTarget.CacheFileOffset,
- .ChunkSize = ChunkTarget.ChunkRawSize});
- }
- TargetStart += ChunkTarget.TargetChunkLocationCount;
- }
-
- std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) {
- if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
- {
- return true;
- }
- else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
- {
- return false;
- }
- if (Lhs.Target->Offset < Rhs.Target->Offset)
- {
- return true;
- }
- return false;
- });
- }
-
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("Write");
-
- BufferedOpenFile SourceFile(LocalFilePath);
- WriteFileCache OpenFileCache;
- for (const WriteOp& Op : WriteOps)
- {
- if (AbortFlag)
- {
- break;
- }
- const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <=
- RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]);
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0);
- const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex];
- const uint64_t ChunkSize = Op.ChunkSize;
- CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ChunkSize);
-
- ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]);
-
- OpenFileCache.WriteToFile<CompositeBuffer>(
- RemoteSequenceIndex,
- [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
- return GetTempChunkedSequenceFileName(
- CacheFolderPath,
- RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
- },
- ChunkSource,
- Op.Target->Offset,
- RemoteContent.RawSizes[RemotePathIndex]);
- WriteToDiskBytes += ChunkSize;
- CacheLocalFileBytesRead += ChunkSize; // TODO: This should be the sum of unique chunk sizes?
- }
- }
- if (!AbortFlag)
- {
- // Write tracking, updating this must be done without any files open (WriteFileCache)
- for (const WriteOp& Op : WriteOps)
- {
- const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
- if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
- {
- const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- {
- ZEN_TRACE_CPU("VerifyHash");
- const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
- GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
- if (VerifyChunkHash != SequenceRawHash)
- {
- throw std::runtime_error(
- fmt::format("Written chunk sequence {} hash does not match expected hash {}",
- VerifyChunkHash,
- SequenceRawHash));
- }
- }
-
- ZEN_TRACE_CPU("rename");
- ZEN_ASSERT_SLOW(
- !std::filesystem::exists(GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
- std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
- GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
- }
- }
-
- ChunkCountWritten += gsl::narrow<uint32_t>(CopyData.ChunkTargets.size());
- ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]);
- }
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
- }
- },
- Work.DefaultErrorFunction());
- }
-
for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++)
{
if (AbortFlag)
@@ -5057,9 +5153,8 @@ namespace {
uint64_t RawSize;
if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, RawHash, RawSize))
{
- LooseChunksBytes += ExistingCompressedPart.GetSize();
- RequestsComplete++;
- if (RequestsComplete == TotalRequestCount)
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
@@ -5082,8 +5177,10 @@ namespace {
&RemoteLookup,
&CacheFolderPath,
&SequenceIndexChunksLeftToWriteCounters,
- &WriteToDiskBytes,
- &ChunkCountWritten,
+ &Work,
+ &WritePool,
+ &DiskStats,
+ &WriteChunkStats,
&WritePartsComplete,
&TotalPartWriteCount,
&FilteredWrittenBytesPerSecond,
@@ -5113,12 +5210,15 @@ namespace {
ChunkHash,
ChunkTargetPtrs,
std::move(CompressedPart),
- WriteToDiskBytes);
+ DiskStats,
+ WriteChunkStats);
+ WriteChunkStats.ChunkCountWritten++;
+ WriteChunkStats.ChunkBytesWritten +=
+ RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex];
+ WritePartsComplete++;
if (!AbortFlag)
{
- ChunkCountWritten++;
- WritePartsComplete++;
if (WritePartsComplete == TotalPartWriteCount)
{
FilteredWrittenBytesPerSecond.Stop();
@@ -5126,12 +5226,20 @@ namespace {
std::filesystem::remove(CompressedChunkPath);
- CompleteChunkTargets(TargetFolder,
- RemoteContent,
- ChunkHash,
- ChunkTargetPtrs,
- SequenceIndexChunksLeftToWriteCounters,
- NeedHashVerify);
+ std::vector<uint32_t> CompletedSequences =
+ CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
+ if (NeedHashVerify)
+ {
+ VerifyAndCompleteChunkSequencesAsync(TargetFolder,
+ RemoteContent,
+ CompletedSequences,
+ Work,
+ WritePool);
+ }
+ else
+ {
+ FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences);
+ }
}
}
},
@@ -5151,11 +5259,10 @@ namespace {
PreferredMultipartChunkSize,
Work,
NetworkPool,
- BytesDownloaded,
- MultipartAttachmentCount,
+ DownloadStats,
[&, RemoteChunkIndex, ChunkTargetPtrs](IoBuffer&& Payload) mutable {
- RequestsComplete++;
- if (RequestsComplete == TotalRequestCount)
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
@@ -5168,12 +5275,11 @@ namespace {
WritePool,
std::move(Payload),
SequenceIndexChunksLeftToWriteCounters,
- WriteToDiskBytes,
- ChunkCountWritten,
WritePartsComplete,
TotalPartWriteCount,
- LooseChunksBytes,
- FilteredWrittenBytesPerSecond);
+ FilteredWrittenBytesPerSecond,
+ DiskStats,
+ WriteChunkStats);
});
}
else
@@ -5186,10 +5292,10 @@ namespace {
throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
}
uint64_t BlobSize = BuildBlob.GetSize();
- BytesDownloaded += BlobSize;
-
- RequestsComplete++;
- if (RequestsComplete == TotalRequestCount)
+ DownloadStats.DownloadedChunkCount++;
+ DownloadStats.DownloadedChunkByteCount += BlobSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
@@ -5202,12 +5308,11 @@ namespace {
WritePool,
std::move(BuildBlob),
SequenceIndexChunksLeftToWriteCounters,
- WriteToDiskBytes,
- ChunkCountWritten,
WritePartsComplete,
TotalPartWriteCount,
- LooseChunksBytes,
- FilteredWrittenBytesPerSecond);
+ FilteredWrittenBytesPerSecond,
+ DiskStats,
+ WriteChunkStats);
}
}
}
@@ -5215,6 +5320,186 @@ namespace {
Work.DefaultErrorFunction());
}
+ for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++)
+ {
+ if (AbortFlag)
+ {
+ break;
+ }
+
+ Work.ScheduleWork(
+ WritePool, // GetSyncWorkerPool(),//
+ [&, CopyDataIndex](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_CopyLocal");
+
+ FilteredWrittenBytesPerSecond.Start();
+ const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex];
+ const uint32_t LocalPathIndex = LocalLookup.SequenceIndexFirstPathIndex[CopyData.LocalSequenceIndex];
+ const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
+ ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty());
+
+ uint64_t CacheLocalFileBytesRead = 0;
+
+ size_t TargetStart = 0;
+ const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets(
+ CopyData.TargetChunkLocationPtrs);
+
+ struct WriteOp
+ {
+ const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr;
+ uint64_t CacheFileOffset = (uint64_t)-1;
+ uint32_t ChunkIndex = (uint32_t)-1;
+ };
+
+ std::vector<WriteOp> WriteOps;
+
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("Sort");
+ WriteOps.reserve(AllTargets.size());
+ for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets)
+ {
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange =
+ AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount);
+ for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange)
+ {
+ WriteOps.push_back(WriteOp{.Target = Target,
+ .CacheFileOffset = ChunkTarget.CacheFileOffset,
+ .ChunkIndex = ChunkTarget.RemoteChunkIndex});
+ }
+ TargetStart += ChunkTarget.TargetChunkLocationCount;
+ }
+
+ std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) {
+ if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
+ {
+ return true;
+ }
+ else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
+ {
+ return false;
+ }
+ if (Lhs.Target->Offset < Rhs.Target->Offset)
+ {
+ return true;
+ }
+ return false;
+ });
+ }
+
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("Write");
+
+ tsl::robin_set<uint32_t> ChunkIndexesWritten;
+
+ BufferedOpenFile SourceFile(LocalFilePath, DiskStats);
+ WriteFileCache OpenFileCache(DiskStats);
+ for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();)
+ {
+ if (AbortFlag)
+ {
+ break;
+ }
+ const WriteOp& Op = WriteOps[WriteOpIndex];
+
+ const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <=
+ RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]);
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0);
+ const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex];
+ const uint64_t ChunkSize = RemoteContent.ChunkedContent.ChunkRawSizes[Op.ChunkIndex];
+
+ uint64_t ReadLength = ChunkSize;
+ size_t WriteCount = 1;
+ uint64_t OpSourceEnd = Op.CacheFileOffset + ChunkSize;
+ uint64_t OpTargetEnd = Op.Target->Offset + ChunkSize;
+ while ((WriteOpIndex + WriteCount) < WriteOps.size())
+ {
+ const WriteOp& NextOp = WriteOps[WriteOpIndex + WriteCount];
+ if (NextOp.Target->SequenceIndex != Op.Target->SequenceIndex)
+ {
+ break;
+ }
+ if (NextOp.Target->Offset != OpTargetEnd)
+ {
+ break;
+ }
+ if (NextOp.CacheFileOffset != OpSourceEnd)
+ {
+ break;
+ }
+ const uint64_t NextChunkLength = RemoteContent.ChunkedContent.ChunkRawSizes[NextOp.ChunkIndex];
+ if (ReadLength + NextChunkLength > 512u * 1024u)
+ {
+ break;
+ }
+ ReadLength += NextChunkLength;
+ OpSourceEnd += NextChunkLength;
+ OpTargetEnd += NextChunkLength;
+ WriteCount++;
+ }
+
+ CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ReadLength);
+ ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]);
+
+ OpenFileCache.WriteToFile<CompositeBuffer>(
+ RemoteSequenceIndex,
+ [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
+ return GetTempChunkedSequenceFileName(
+ CacheFolderPath,
+ RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
+ },
+ ChunkSource,
+ Op.Target->Offset,
+ RemoteContent.RawSizes[RemotePathIndex]);
+ for (size_t WrittenOpIndex = WriteOpIndex; WrittenOpIndex < WriteOpIndex + WriteCount; WrittenOpIndex++)
+ {
+ const WriteOp& WrittenOp = WriteOps[WrittenOpIndex];
+ if (ChunkIndexesWritten.insert(WrittenOp.ChunkIndex).second)
+ {
+ WriteChunkStats.ChunkCountWritten++;
+ WriteChunkStats.ChunkBytesWritten +=
+ RemoteContent.ChunkedContent.ChunkRawSizes[WrittenOp.ChunkIndex];
+ }
+ }
+
+ CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes?
+
+ WriteOpIndex += WriteCount;
+ }
+ }
+ if (!AbortFlag)
+ {
+ // Write tracking, updating this must be done without any files open (WriteFileCache)
+ std::vector<uint32_t> CompletedChunkSequences;
+ for (const WriteOp& Op : WriteOps)
+ {
+ const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
+ if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
+ {
+ CompletedChunkSequences.push_back(RemoteSequenceIndex);
+ }
+ }
+ VerifyAndCompleteChunkSequencesAsync(CacheFolderPath,
+ RemoteContent,
+ CompletedChunkSequences,
+ Work,
+ WritePool);
+ ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]);
+ }
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+
for (uint32_t BlockIndex : CachedChunkBlockIndexes)
{
if (AbortFlag)
@@ -5244,11 +5529,13 @@ namespace {
RemoteContent,
BlockDescription,
SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
CompositeBuffer(std::move(BlockBuffer)),
RemoteLookup,
RemoteChunkIndexNeedsCopyFromSourceFlags,
- ChunkCountWritten,
- WriteToDiskBytes))
+ DiskStats,
+ WriteChunkStats))
{
std::error_code DummyEc;
std::filesystem::remove(BlockChunkPath, DummyEc);
@@ -5291,11 +5578,10 @@ namespace {
throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
}
uint64_t BlockSize = BlockBuffer.GetSize();
- BytesDownloaded += BlockSize;
- BlockBytes += BlockSize;
- DownloadedBlocks++;
- RequestsComplete++;
- if (RequestsComplete == TotalRequestCount)
+ DownloadStats.DownloadedBlockCount++;
+ DownloadStats.DownloadedBlockByteCount += BlockSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
@@ -5381,26 +5667,28 @@ namespace {
RemoteContent,
BlockDescription,
SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
CompositeBuffer(std::move(BlockPartialBuffer)),
BlockRange.ChunkBlockIndexStart,
BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
RemoteLookup,
RemoteChunkIndexNeedsCopyFromSourceFlags,
- ChunkCountWritten,
- WriteToDiskBytes))
+ DiskStats,
+ WriteChunkStats))
{
std::error_code DummyEc;
std::filesystem::remove(BlockChunkPath, DummyEc);
throw std::runtime_error(
fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
}
- WritePartsComplete++;
if (!BlockChunkPath.empty())
{
std::filesystem::remove(BlockChunkPath);
}
+ WritePartsComplete++;
if (WritePartsComplete == TotalPartWriteCount)
{
FilteredWrittenBytesPerSecond.Stop();
@@ -5436,11 +5724,10 @@ namespace {
throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
}
uint64_t BlockSize = BlockBuffer.GetSize();
- BytesDownloaded += BlockSize;
- BlockBytes += BlockSize;
- DownloadedBlocks++;
- RequestsComplete++;
- if (RequestsComplete == TotalRequestCount)
+ DownloadStats.DownloadedBlockCount++;
+ DownloadStats.DownloadedBlockByteCount += BlockSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
@@ -5488,15 +5775,17 @@ namespace {
{
Work.ScheduleWork(
WritePool, // WritePool, GetSyncWorkerPool()
- [&RemoteContent,
+ [&Work,
+ &WritePool,
+ &RemoteContent,
&RemoteLookup,
CacheFolderPath,
&RemoteChunkIndexNeedsCopyFromSourceFlags,
&SequenceIndexChunksLeftToWriteCounters,
BlockIndex,
&BlockDescriptions,
- &ChunkCountWritten,
- &WriteToDiskBytes,
+ &WriteChunkStats,
+ &DiskStats,
&WritePartsComplete,
&TotalPartWriteCount,
&FilteredWrittenBytesPerSecond,
@@ -5529,23 +5818,26 @@ namespace {
RemoteContent,
BlockDescription,
SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
CompositeBuffer(std::move(BlockBuffer)),
RemoteLookup,
RemoteChunkIndexNeedsCopyFromSourceFlags,
- ChunkCountWritten,
- WriteToDiskBytes))
+ DiskStats,
+ WriteChunkStats))
{
std::error_code DummyEc;
std::filesystem::remove(BlockChunkPath, DummyEc);
throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
}
- WritePartsComplete++;
if (!BlockChunkPath.empty())
{
std::filesystem::remove(BlockChunkPath);
}
+ WritePartsComplete++;
+
if (WritePartsComplete == TotalPartWriteCount)
{
FilteredWrittenBytesPerSecond.Stop();
@@ -5559,35 +5851,32 @@ namespace {
Work.DefaultErrorFunction());
}
- ZEN_DEBUG("Fetching {} with {} slack (ideal {}) out of {} using {} requests for {} blocks",
- NiceBytes(AllBlocksFetch),
- NiceBytes(AllBlocksSlack),
- NiceBytes(AllBlockChunksSize),
- NiceBytes(AllBlocksSize),
- AllBlockRequests,
- BlocksNeededCount);
{
ZEN_TRACE_CPU("WriteChunks_Wait");
Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(IsAborted, PendingWork);
- ZEN_ASSERT(ChunkCountToWrite >= ChunkCountWritten.load());
- FilteredWrittenBytesPerSecond.Update(WriteToDiskBytes.load());
- FilteredDownloadedBytesPerSecond.Update(BytesDownloaded.load());
+ ZEN_ASSERT(ChunkCountToWrite >= WriteChunkStats.ChunkCountWritten.load());
+ uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() +
+ DownloadStats.DownloadedBlockByteCount.load() +
+ +DownloadStats.DownloadedPartialBlockByteCount.load();
+ FilteredWrittenBytesPerSecond.Update(DiskStats.WriteByteCount.load());
+ FilteredDownloadedBytesPerSecond.Update(DownloadedBytes);
std::string Details = fmt::format("{}/{} ({} {}bits/s) downloaded. {}/{} ({} {}B/s) written.",
- RequestsComplete.load(),
+ DownloadStats.RequestsCompleteCount.load(),
TotalRequestCount,
- NiceBytes(BytesDownloaded.load()),
+ NiceBytes(DownloadedBytes),
NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8),
- ChunkCountWritten.load(),
+ WriteChunkStats.ChunkCountWritten.load(),
ChunkCountToWrite,
- NiceBytes(WriteToDiskBytes.load()),
+ NiceBytes(DiskStats.WriteByteCount.load()),
NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()));
- WriteProgressBar.UpdateState({.Task = "Writing chunks ",
- .Details = Details,
- .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite),
- .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - ChunkCountWritten.load())},
- false);
+ WriteProgressBar.UpdateState(
+ {.Task = "Writing chunks ",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite),
+ .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - WriteChunkStats.ChunkCountWritten.load())},
+ false);
});
}
@@ -5617,25 +5906,21 @@ namespace {
}
ZEN_ASSERT(RawSequencesMissingWriteCount == 0);
+ const uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() + DownloadStats.DownloadedBlockByteCount.load() +
+ +DownloadStats.DownloadedPartialBlockByteCount.load();
ZEN_CONSOLE("Downloaded {} ({}bits/s) in {}. Wrote {} ({}B/s) in {}. Completed in {}",
- NiceBytes(BytesDownloaded.load()),
- NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), BytesDownloaded * 8)),
+ NiceBytes(DownloadedBytes),
+ NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), DownloadedBytes * 8)),
NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000),
- NiceBytes(WriteToDiskBytes.load()),
- NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), WriteToDiskBytes.load())),
+ NiceBytes(DiskStats.WriteByteCount.load()),
+ NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), DiskStats.WriteByteCount.load())),
NiceTimeSpanMs(FilteredWrittenBytesPerSecond.GetElapsedTimeUS() / 1000),
NiceTimeSpanMs(WriteTimer.GetElapsedTimeMs()));
- }
- std::vector<std::pair<IoHash, uint32_t>> Targets;
- Targets.reserve(RemoteContent.Paths.size());
- for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++)
- {
- Targets.push_back(std::make_pair(RemoteContent.RawHashes[RemotePathIndex], RemotePathIndex));
+ WriteChunkStats.WriteChunksElapsedWallTimeUs = WriteTimer.GetElapsedTimeUs();
+ WriteChunkStats.DownloadTimeUs = FilteredDownloadedBytesPerSecond.GetElapsedTimeUS();
+ WriteChunkStats.WriteTimeUs = FilteredWrittenBytesPerSecond.GetElapsedTimeUS();
}
- std::sort(Targets.begin(), Targets.end(), [](const std::pair<IoHash, uint32_t>& Lhs, const std::pair<IoHash, uint32_t>& Rhs) {
- return Lhs.first < Rhs.first;
- });
// Move all files we will reuse to cache folder
// TODO: If WipeTargetFolder is false we could check which files are already correct and leave them in place
@@ -5662,6 +5947,7 @@ namespace {
if (WipeTargetFolder)
{
ZEN_TRACE_CPU("UpdateFolder_WipeTarget");
+ Stopwatch Timer;
// Clean target folder
ZEN_CONSOLE("Wiping {}", Path);
@@ -5669,10 +5955,12 @@ namespace {
{
ZEN_WARN("Some files in {} could not be removed", Path);
}
+ RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs();
}
else
{
ZEN_TRACE_CPU("UpdateFolder_RemoveUnused");
+ Stopwatch Timer;
// Remove unused tracked files
tsl::robin_map<std::string, uint32_t> RemotePathToRemoteIndex;
@@ -5702,10 +5990,12 @@ namespace {
std::filesystem::remove(LocalFilePath);
}
}
+ RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs();
}
{
ZEN_TRACE_CPU("UpdateFolder_FinalizeTree");
+ Stopwatch Timer;
WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
@@ -5719,6 +6009,16 @@ namespace {
std::atomic<uint64_t> TargetsComplete = 0;
+ std::vector<std::pair<IoHash, uint32_t>> Targets;
+ Targets.reserve(RemoteContent.Paths.size());
+ for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++)
+ {
+ Targets.push_back(std::make_pair(RemoteContent.RawHashes[RemotePathIndex], RemotePathIndex));
+ }
+ std::sort(Targets.begin(), Targets.end(), [](const std::pair<IoHash, uint32_t>& Lhs, const std::pair<IoHash, uint32_t>& Rhs) {
+ return Lhs.first < Rhs.first;
+ });
+
size_t TargetOffset = 0;
while (TargetOffset < Targets.size())
{
@@ -5772,6 +6072,7 @@ namespace {
SetFileReadOnly(FirstTargetFilePath, false);
}
std::filesystem::rename(CacheFilePath, FirstTargetFilePath);
+ RebuildFolderStateStats.FinalizeTreeFilesMovedCount++;
}
OutLocalFolderState.Attributes[FirstTargetPathIndex] =
@@ -5800,6 +6101,7 @@ namespace {
SetFileReadOnly(ExtraTargetFilePath, false);
}
CopyFile(FirstTargetFilePath, ExtraTargetFilePath, {.EnableClone = false});
+ RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++;
OutLocalFolderState.Attributes[ExtraTargetPathIndex] =
RemoteContent.Attributes.empty()
@@ -5834,6 +6136,8 @@ namespace {
});
}
+ RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs = Timer.GetElapsedTimeUs();
+
if (AbortFlag)
{
return;
@@ -6493,13 +6797,21 @@ namespace {
}
else
{
- ExtendableStringBuilder<128> SB;
+ ExtendableStringBuilder<128> BuildPartString;
for (const std::pair<Oid, std::string>& BuildPart : AllBuildParts)
{
- SB.Append(fmt::format(" {} ({})", BuildPart.second, BuildPart.first));
+ BuildPartString.Append(fmt::format(" {} ({})", BuildPart.second, BuildPart.first));
}
- ZEN_CONSOLE("Downloading build {}, parts:{}", BuildId, SB.ToView());
+ ZEN_CONSOLE("Downloading build {}, parts:{}", BuildId, BuildPartString.ToView());
FolderContent LocalFolderState;
+
+ DiskStatistics DiskStats;
+ CacheMappingStatistics CacheMappingStats;
+ DownloadStatistics DownloadStats;
+ WriteChunkStatistics WriteChunkStats;
+ RebuildFolderStateStatistics RebuildFolderStateStats;
+ VerifyFolderStatistics VerifyFolderStats;
+
UpdateFolder(Storage,
BuildId,
Path,
@@ -6511,11 +6823,16 @@ namespace {
LooseChunkHashes,
AllowPartialBlockRequests,
WipeTargetFolder,
- LocalFolderState);
+ LocalFolderState,
+ DiskStats,
+ CacheMappingStats,
+ DownloadStats,
+ WriteChunkStats,
+ RebuildFolderStateStats);
if (!AbortFlag)
{
- VerifyFolder(RemoteContent, Path, PostDownloadVerify);
+ VerifyFolder(RemoteContent, Path, PostDownloadVerify, VerifyFolderStats);
Stopwatch WriteStateTimer;
CbObject StateObject = CreateStateObject(BuildId, AllBuildParts, PartContents, LocalFolderState);
@@ -6529,8 +6846,37 @@ namespace {
CompactBinaryToJson(StateObject, SB);
WriteFile(Path / ZenStateFileJsonPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size()));
#endif // 0
+ const uint64_t DownloadCount = DownloadStats.DownloadedChunkCount.load() + DownloadStats.DownloadedBlockCount.load() +
+ DownloadStats.DownloadedPartialBlockCount.load();
+ const uint64_t DownloadByteCount = DownloadStats.DownloadedChunkByteCount.load() +
+ DownloadStats.DownloadedBlockByteCount.load() +
+ DownloadStats.DownloadedPartialBlockByteCount.load();
+ const uint64_t DownloadTimeMs = DownloadTimer.GetElapsedTimeMs();
- ZEN_CONSOLE("Downloaded build in {}.", NiceTimeSpanMs(DownloadTimer.GetElapsedTimeMs()));
+ ZEN_CONSOLE(
+ "Downloaded build {}, parts:{} in {}\n"
+ " Download: {} ({}) {}bits/s\n"
+ " Write: {} ({}) {}B/s\n"
+ " Clean: {}\n"
+ " Finalize: {}\n"
+ " Verify: {}",
+ BuildId,
+ BuildPartString.ToView(),
+ NiceTimeSpanMs(DownloadTimeMs),
+
+ DownloadCount,
+ NiceBytes(DownloadByteCount),
+ NiceNum(GetBytesPerSecond(WriteChunkStats.DownloadTimeUs, DownloadByteCount * 8)),
+
+ DiskStats.WriteCount.load(),
+ NiceBytes(DiskStats.WriteByteCount.load()),
+ NiceNum(GetBytesPerSecond(WriteChunkStats.WriteTimeUs, DiskStats.WriteByteCount.load())),
+
+ NiceTimeSpanMs(RebuildFolderStateStats.CleanFolderElapsedWallTimeUs / 1000),
+
+ NiceTimeSpanMs(RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs / 1000),
+
+ NiceTimeSpanMs(VerifyFolderStats.VerifyElapsedWallTimeUs / 1000));
}
}
if (CleanDirectory(ZenTempFolder, {}))
@@ -6708,7 +7054,7 @@ BuildsCommand::BuildsCommand()
m_Options.add_options()("h,help", "Print help");
auto AddAuthOptions = [this](cxxopts::Options& Ops) {
- Ops.add_option("", "", "system-dir", "Specify system root", cxxopts::value<std::filesystem::path>(m_SystemRootDir), "<systemdir>");
+ Ops.add_option("", "", "system-dir", "Specify system root", cxxopts::value<std::string>(m_SystemRootDir), "<systemdir>");
// Direct access token (may expire)
Ops.add_option("auth-token",
@@ -7050,7 +7396,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
auto CreateAuthMgr = [&]() {
if (!Auth)
{
- std::filesystem::path DataRoot = m_SystemRootDir.empty() ? PickDefaultSystemRootDirectory() : m_SystemRootDir;
+ std::filesystem::path DataRoot = m_SystemRootDir.empty() ? PickDefaultSystemRootDirectory() : StringToPath(m_SystemRootDir);
if (m_EncryptionKey.empty())
{
@@ -7172,8 +7518,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
else if (!m_StoragePath.empty())
{
- ZEN_CONSOLE("Querying builds in folder '{}'.", m_StoragePath);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
+ std::filesystem::path StoragePath = StringToPath(m_StoragePath);
+ ZEN_CONSOLE("Querying builds in folder '{}'.", StoragePath);
+ Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
}
else
{
@@ -7224,9 +7571,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
}
+ std::filesystem::path Path = StringToPath(m_Path);
+
if (m_BuildPartName.empty())
{
- m_BuildPartName = m_Path.filename().string();
+ m_BuildPartName = Path.filename().string();
}
const bool GeneratedBuildId = m_BuildId.empty();
@@ -7266,26 +7615,27 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
ZEN_CONSOLE("Uploading '{}' from '{}' to cloud endpoint '{}'. SessionId: '{}'. Namespace '{}', Bucket '{}', {}BuildId '{}'",
m_BuildPartName,
- m_Path,
+ Path,
m_BuildsUrl,
Http.GetSessionId(),
m_Namespace,
m_Bucket,
GeneratedBuildId ? "Generated " : "",
BuildId);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
+ Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
StorageName = "Cloud DDC";
}
else if (!m_StoragePath.empty())
{
+ std::filesystem::path StoragePath = StringToPath(m_StoragePath);
ZEN_CONSOLE("Uploading '{}' from '{}' to folder '{}'. {}BuildId '{}'",
m_BuildPartName,
- m_Path,
- m_StoragePath,
+ Path,
+ StoragePath,
GeneratedBuildId ? "Generated " : "",
BuildId);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, m_WriteMetadataAsJson, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", m_StoragePath.stem());
+ Storage = CreateFileBuildStorage(StoragePath, StorageStats, m_WriteMetadataAsJson, DefaultLatency, DefaultDelayPerKBSec);
+ StorageName = fmt::format("Disk {}", StoragePath.stem());
}
else
{
@@ -7328,7 +7678,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildId,
BuildPartId,
m_BuildPartName,
- m_Path,
+ Path,
m_ManifestPath,
m_BlockReuseMinPercentLimit,
m_AllowMultiparts,
@@ -7397,6 +7747,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
}
+ std::filesystem::path Path = StringToPath(m_Path);
+
BuildStorage::Statistics StorageStats;
std::unique_ptr<BuildStorage> Storage;
std::string StorageName;
@@ -7404,19 +7756,20 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
ZEN_CONSOLE("Downloading Build '{}' to '{}' from {}. SessionId: '{}'. Namespace '{}', Bucket '{}'",
BuildId,
- m_Path,
+ Path,
m_BuildsUrl,
Http.GetSessionId(),
m_Namespace,
m_Bucket);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
+ Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
StorageName = ZEN_CLOUD_STORAGE;
}
else if (!m_StoragePath.empty())
{
- ZEN_CONSOLE("Downloading Build '{}' to '{}' from folder {}", BuildId, m_Path, m_StoragePath);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", m_StoragePath.stem());
+ std::filesystem::path StoragePath = StringToPath(m_StoragePath);
+ ZEN_CONSOLE("Downloading Build '{}' to '{}' from folder {}", BuildId, Path, StoragePath);
+ Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
+ StorageName = fmt::format("Disk {}", StoragePath.stem());
}
else
{
@@ -7427,7 +7780,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildId,
BuildPartIds,
m_BuildPartNames,
- m_Path,
+ Path,
m_AllowMultiparts,
m_AllowPartialBlockRequests,
m_Clean,
@@ -7466,7 +7819,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
throw zen::OptionParseException(fmt::format("compare-path is required\n{}", m_DownloadOptions.help()));
}
- DiffFolders(m_Path, m_DiffPath, m_OnlyChunked);
+ std::filesystem::path Path = StringToPath(m_Path);
+ DiffFolders(Path, m_DiffPath, m_OnlyChunked);
return AbortFlag ? 11 : 0;
}
@@ -7491,6 +7845,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
// "07d3964f919d577a321a1fdd",
// "07d396a6ce875004e16b9528"};
+ std::filesystem::path Path = StringToPath(m_Path);
+
BuildStorage::Statistics StorageStats;
std::unique_ptr<BuildStorage> Storage;
std::string StorageName;
@@ -7498,25 +7854,27 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
ZEN_CONSOLE("Downloading {} to '{}' from {}. SessionId: '{}'. Namespace '{}', Bucket '{}'",
FormatArray<std::string>(m_BuildIds, " "sv),
- m_Path,
+ Path,
m_BuildsUrl,
Http.GetSessionId(),
m_Namespace,
m_Bucket);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
+ Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
StorageName = ZEN_CLOUD_STORAGE;
}
else if (!m_StoragePath.empty())
{
- ZEN_CONSOLE("Downloading {}'to '{}' from folder '{}'", FormatArray<std::string>(m_BuildIds, " "sv), m_Path, m_StoragePath);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", m_StoragePath.stem());
+ std::filesystem::path StoragePath = StringToPath(m_StoragePath);
+ ZEN_CONSOLE("Downloading '{}' to '{}' from folder '{}'", FormatArray<std::string>(m_BuildIds, " "sv), Path, StoragePath);
+ Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
+ StorageName = fmt::format("Disk {}", StoragePath.stem());
}
else
{
throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
}
+ Stopwatch Timer;
for (const std::string& BuildIdString : m_BuildIds)
{
Oid BuildId = Oid::FromHexString(BuildIdString);
@@ -7528,7 +7886,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildId,
{},
{},
- m_Path,
+ Path,
m_AllowMultiparts,
m_AllowPartialBlockRequests,
BuildIdString == m_BuildIds.front(),
@@ -7540,6 +7898,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\n");
}
+ ZEN_CONSOLE("Completed in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
return 0;
}
@@ -7555,8 +7914,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help()));
}
+ std::filesystem::path Path = StringToPath(m_Path);
+
m_BuildId = Oid::NewOid().ToString();
- m_BuildPartName = m_Path.filename().string();
+ m_BuildPartName = Path.filename().string();
m_BuildPartId = Oid::NewOid().ToString();
m_CreateBuild = true;
@@ -7566,16 +7927,18 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
std::unique_ptr<BuildStorage> Storage;
std::string StorageName;
- if (m_BuildsUrl.empty() && m_StoragePath.empty())
+ std::filesystem::path StoragePath = StringToPath(m_StoragePath);
+
+ if (m_BuildsUrl.empty() && StoragePath.empty())
{
- m_StoragePath = GetRunningExecutablePath().parent_path() / ".tmpstore";
- CreateDirectories(m_StoragePath);
- CleanDirectory(m_StoragePath, {});
+ m_StoragePath = (GetRunningExecutablePath().parent_path() / ".tmpstore").generic_string();
+ CreateDirectories(StoragePath);
+ CleanDirectory(StoragePath, {});
}
auto _ = MakeGuard([&]() {
- if (m_BuildsUrl.empty() && m_StoragePath.empty())
+ if (m_BuildsUrl.empty() && StoragePath.empty())
{
- DeleteDirectories(m_StoragePath);
+ DeleteDirectories(StoragePath);
}
});
@@ -7583,24 +7946,24 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
ZEN_CONSOLE("Using '{}' to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'",
m_BuildPartName.empty() ? m_BuildPartId : m_BuildPartName,
- m_Path,
+ Path,
m_BuildsUrl,
Http.GetSessionId(),
m_Namespace,
m_Bucket,
BuildId);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
+ Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
StorageName = ZEN_CLOUD_STORAGE;
}
- else if (!m_StoragePath.empty())
+ else if (!StoragePath.empty())
{
ZEN_CONSOLE("Using '{}' to '{}' from folder {}. BuildId '{}'",
m_BuildPartName.empty() ? m_BuildPartId : m_BuildPartName,
- m_Path,
- m_StoragePath,
+ Path,
+ StoragePath,
BuildId);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", m_StoragePath.stem());
+ Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
+ StorageName = fmt::format("Disk {}", StoragePath.stem());
}
else
{
@@ -7632,7 +7995,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildId,
BuildPartId,
m_BuildPartName,
- m_Path,
+ Path,
{},
m_BlockReuseMinPercentLimit,
m_AllowMultiparts,
@@ -7646,7 +8009,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
return 11;
}
- const std::filesystem::path DownloadPath = m_Path.parent_path() / (m_BuildPartName + "_download");
+ const std::filesystem::path DownloadPath = Path.parent_path() / (m_BuildPartName + "_download");
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, true, true);
if (AbortFlag)
@@ -7708,27 +8071,29 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
case 0:
{
uint64_t SourceSize = *FileSizeIt;
- if (SourceSize > 0)
+ if (SourceSize > 256)
{
Work.ScheduleWork(
GetMediumWorkerPool(EWorkloadType::Burst),
[SourceSize, FilePath](std::atomic<bool>&) {
if (!AbortFlag)
{
- IoBuffer Scrambled(SourceSize);
+ bool IsReadOnly = SetFileReadOnly(FilePath, false);
{
- IoBuffer Source = IoBufferBuilder::MakeFromFile(FilePath);
- Scrambled.GetMutableView().CopyFrom(
- Source.GetView().Mid(SourceSize / 3, SourceSize / 3));
- Scrambled.GetMutableView()
- .Mid(SourceSize / 3)
- .CopyFrom(Source.GetView().Mid(0, SourceSize / 3));
- Scrambled.GetMutableView()
- .Mid((SourceSize / 3) * 2)
- .CopyFrom(Source.GetView().Mid(SourceSize / 2, SourceSize / 3));
+ BasicFile Source(FilePath, BasicFile::Mode::kWrite);
+ uint64_t RangeSize = Min(SourceSize / 3, 512u * 1024u);
+ IoBuffer TempBuffer1(RangeSize);
+ IoBuffer TempBuffer2(RangeSize);
+ IoBuffer TempBuffer3(RangeSize);
+ Source.Read(TempBuffer1.GetMutableView().GetData(), RangeSize, 0);
+ Source.Read(TempBuffer2.GetMutableView().GetData(), RangeSize, SourceSize / 2);
+ Source.Read(TempBuffer3.GetMutableView().GetData(),
+ RangeSize,
+ SourceSize - RangeSize);
+ Source.Write(TempBuffer1, SourceSize / 2);
+ Source.Write(TempBuffer2, SourceSize - RangeSize);
+ Source.Write(TempBuffer3, SourceSize - 0);
}
- bool IsReadOnly = SetFileReadOnly(FilePath, false);
- WriteFile(FilePath, Scrambled);
if (IsReadOnly)
{
SetFileReadOnly(FilePath, true);
@@ -7870,6 +8235,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
std::unique_ptr<BuildStorage> Storage;
std::string StorageName;
+ std::filesystem::path Path = StringToPath(m_Path);
+
if (!m_BuildsUrl.empty())
{
ZEN_CONSOLE("Using from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'",
@@ -7878,14 +8245,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_Namespace,
m_Bucket,
BuildId);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
+ Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
StorageName = ZEN_CLOUD_STORAGE;
}
else if (!m_StoragePath.empty())
{
- ZEN_CONSOLE("Using folder {}. BuildId '{}'", m_StoragePath, BuildId);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", m_StoragePath.stem());
+ std::filesystem::path StoragePath = StringToPath(m_StoragePath);
+ ZEN_CONSOLE("Using folder {}. BuildId '{}'", StoragePath, BuildId);
+ Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
+ StorageName = fmt::format("Disk {}", StoragePath.stem());
}
else
{
@@ -7937,6 +8305,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
std::unique_ptr<BuildStorage> Storage;
std::string StorageName;
+ std::filesystem::path Path = StringToPath(m_Path);
+
if (!m_BuildsUrl.empty())
{
ZEN_CONSOLE("Using from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'",
@@ -7945,14 +8315,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_Namespace,
m_Bucket,
BuildId);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
+ Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
StorageName = ZEN_CLOUD_STORAGE;
}
else if (!m_StoragePath.empty())
{
- ZEN_CONSOLE("Using folder {}. BuildId '{}'", m_StoragePath, BuildId);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", m_StoragePath.stem());
+ std::filesystem::path StoragePath = StringToPath(m_StoragePath);
+ ZEN_CONSOLE("Using folder {}. BuildId '{}'", StoragePath, BuildId);
+ Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
+ StorageName = fmt::format("Disk {}", StoragePath.stem());
}
else
{
@@ -7960,11 +8331,21 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
Oid BuildPartId = Oid::TryFromHexString(m_BuildPartId);
- ValidateBuildPart(*Storage, BuildId, BuildPartId, m_BuildPartName);
+ ValidateStatistics ValidateStats;
+ DownloadStatistics DownloadStats;
+ ValidateBuildPart(*Storage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, DownloadStats);
return AbortFlag ? 13 : 0;
}
}
+ catch (const ParallellWorkException& Ex)
+ {
+ for (const std::string& Error : Ex.m_Errors)
+ {
+ ZEN_ERROR("{}", Error);
+ }
+ return 3;
+ }
catch (const std::exception& Ex)
{
ZEN_ERROR("{}", Ex.what());