aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-03-18 08:56:40 +0100
committerGitHub Enterprise <[email protected]>2025-03-18 08:56:40 +0100
commitfa4ef162b1dd53cbad135850a8f9cf8fb532f395 (patch)
tree53d0da390d49a7e6a8dd1f3dcc14d8d48a8acbba /src
parentfix quoted command lines arguments (#306) (diff)
downloadzen-fa4ef162b1dd53cbad135850a8f9cf8fb532f395.tar.xz
zen-fa4ef162b1dd53cbad135850a8f9cf8fb532f395.zip
improved post upload/download summary (#308)
* added ValidateStatistics and improved post upload summary * improved download statistics * smoother stats update when compressing * better feedback during stream compresss/decompress * don't capture TotalPartWriteCount by reference * disk stats cleanup * multi-test-download overall timer
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.cpp962
-rw-r--r--src/zencore/compress.cpp130
-rw-r--r--src/zencore/include/zencore/compress.h18
3 files changed, 690 insertions, 420 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index f0ee4904e..61e3c0fab 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -567,6 +567,64 @@ namespace {
}
};
+ struct CacheMappingStatistics
+ {
+ uint64_t CacheChunkCount = 0;
+ uint64_t CacheChunkByteCount = 0;
+
+ uint64_t CacheBlockCount = 0;
+ uint64_t CacheBlocksByteCount = 0;
+
+ uint64_t CacheSequenceHashesCount = 0;
+ uint64_t CacheSequenceHashesByteCount = 0;
+
+ uint32_t LocalPathsMatchingSequencesCount = 0;
+ uint64_t LocalPathsMatchingSequencesByteCount = 0;
+
+ uint64_t LocalChunkMatchingRemoteCount = 0;
+ uint64_t LocalChunkMatchingRemoteByteCount = 0;
+ };
+
+ struct DownloadStatistics
+ {
+ std::atomic<uint64_t> RequestsCompleteCount = 0;
+
+ std::atomic<uint64_t> DownloadedChunkCount = 0;
+ std::atomic<uint64_t> DownloadedChunkByteCount = 0;
+ std::atomic<uint64_t> MultipartAttachmentCount = 0;
+
+ std::atomic<uint64_t> DownloadedBlockCount = 0;
+ std::atomic<uint64_t> DownloadedBlockByteCount = 0;
+
+ std::atomic<uint64_t> DownloadedPartialBlockCount = 0;
+ std::atomic<uint64_t> DownloadedPartialBlockByteCount = 0;
+ };
+
+ struct WriteChunkStatistics
+ {
+ std::atomic<uint32_t> ChunkCountWritten = 0;
+ std::atomic<uint64_t> ChunkBytesWritten = 0;
+ uint64_t DownloadTimeUs = 0;
+ uint64_t WriteTimeUs = 0;
+ uint64_t WriteChunksElapsedWallTimeUs = 0;
+ };
+
+ struct RebuildFolderStateStatistics
+ {
+ uint64_t CleanFolderElapsedWallTimeUs = 0;
+ std::atomic<uint32_t> FinalizeTreeFilesMovedCount = 0;
+ std::atomic<uint32_t> FinalizeTreeFilesCopiedCount = 0;
+ uint64_t FinalizeTreeElapsedWallTimeUs = 0;
+ };
+
+ struct VerifyFolderStatistics
+ {
+ std::atomic<uint64_t> FilesVerified = 0;
+ std::atomic<uint64_t> FilesFailed = 0;
+ std::atomic<uint64_t> ReadBytes = 0;
+ uint64_t VerifyElapsedWallTimeUs = 0;
+ };
+
std::vector<uint32_t> CalculateAbsoluteChunkOrders(const std::span<const IoHash> LocalChunkHashes,
const std::span<const uint32_t> LocalChunkOrder,
const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& ChunkHashToLocalChunkIndex,
@@ -1016,7 +1074,15 @@ namespace {
class BufferedOpenFile
{
public:
- BufferedOpenFile(const std::filesystem::path Path) : Source(Path, BasicFile::Mode::kRead), SourceSize(Source.FileSize()) {}
+ BufferedOpenFile(const std::filesystem::path Path, DiskStatistics& DiskStats)
+ : m_Source(Path, BasicFile::Mode::kRead)
+ , m_SourceSize(m_Source.FileSize())
+ , m_DiskStats(DiskStats)
+ {
+ m_DiskStats.OpenReadCount++;
+ m_DiskStats.CurrentOpenFileCount++;
+ }
+ ~BufferedOpenFile() { m_DiskStats.CurrentOpenFileCount--; }
BufferedOpenFile() = delete;
BufferedOpenFile(const BufferedOpenFile&) = delete;
BufferedOpenFile(BufferedOpenFile&&) = delete;
@@ -1028,10 +1094,10 @@ namespace {
{
ZEN_TRACE_CPU("BufferedOpenFile::GetRange");
- ZEN_ASSERT((CacheBlockIndex == (uint64_t)-1) || Cache);
- auto _ = MakeGuard([&]() { ZEN_ASSERT((CacheBlockIndex == (uint64_t)-1) || Cache); });
+ ZEN_ASSERT((m_CacheBlockIndex == (uint64_t)-1) || m_Cache);
+ auto _ = MakeGuard([&]() { ZEN_ASSERT((m_CacheBlockIndex == (uint64_t)-1) || m_Cache); });
- ZEN_ASSERT((Offset + Size) <= SourceSize);
+ ZEN_ASSERT((Offset + Size) <= m_SourceSize);
const uint64_t BlockIndexStart = Offset / BlockSize;
const uint64_t BlockIndexEnd = (Offset + Size - 1) / BlockSize;
@@ -1042,21 +1108,23 @@ namespace {
for (uint64_t BlockIndex = BlockIndexStart; BlockIndex <= BlockIndexEnd; BlockIndex++)
{
const uint64_t BlockStartOffset = BlockIndex * BlockSize;
- if (CacheBlockIndex != BlockIndex)
+ if (m_CacheBlockIndex != BlockIndex)
{
- uint64_t CacheSize = Min(BlockSize, SourceSize - BlockStartOffset);
+ uint64_t CacheSize = Min(BlockSize, m_SourceSize - BlockStartOffset);
ZEN_ASSERT(CacheSize > 0);
- Cache = IoBuffer(CacheSize);
- Source.Read(Cache.GetMutableView().GetData(), CacheSize, BlockStartOffset);
- CacheBlockIndex = BlockIndex;
+ m_Cache = IoBuffer(CacheSize);
+ m_Source.Read(m_Cache.GetMutableView().GetData(), CacheSize, BlockStartOffset);
+ m_DiskStats.ReadCount++;
+ m_DiskStats.ReadByteCount += CacheSize;
+ m_CacheBlockIndex = BlockIndex;
}
const uint64_t BytesRead = ReadOffset - Offset;
ZEN_ASSERT(BlockStartOffset <= ReadOffset);
const uint64_t OffsetIntoBlock = ReadOffset - BlockStartOffset;
- ZEN_ASSERT(OffsetIntoBlock < Cache.GetSize());
- const uint64_t BlockBytes = Min(Cache.GetSize() - OffsetIntoBlock, Size - BytesRead);
- BufferRanges.emplace_back(SharedBuffer(IoBuffer(Cache, OffsetIntoBlock, BlockBytes)));
+ ZEN_ASSERT(OffsetIntoBlock < m_Cache.GetSize());
+ const uint64_t BlockBytes = Min(m_Cache.GetSize() - OffsetIntoBlock, Size - BytesRead);
+ BufferRanges.emplace_back(SharedBuffer(IoBuffer(m_Cache, OffsetIntoBlock, BlockBytes)));
ReadOffset += BlockBytes;
}
CompositeBuffer Result(std::move(BufferRanges));
@@ -1065,10 +1133,11 @@ namespace {
}
private:
- BasicFile Source;
- const uint64_t SourceSize;
- uint64_t CacheBlockIndex = (uint64_t)-1;
- IoBuffer Cache;
+ BasicFile m_Source;
+ const uint64_t m_SourceSize;
+ DiskStatistics& m_DiskStats;
+ uint64_t m_CacheBlockIndex = (uint64_t)-1;
+ IoBuffer m_Cache;
};
class ReadFileCache
@@ -1087,11 +1156,7 @@ namespace {
{
m_OpenFiles.reserve(MaxOpenFileCount);
}
- ~ReadFileCache()
- {
- m_DiskStats.CurrentOpenFileCount -= m_OpenFiles.size();
- m_OpenFiles.clear();
- }
+ ~ReadFileCache() { m_OpenFiles.clear(); }
CompositeBuffer GetRange(uint32_t SequenceIndex, uint64_t Offset, uint64_t Size)
{
@@ -1109,7 +1174,6 @@ namespace {
m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(SequenceIndex, std::move(CachedFile)));
}
CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size);
- m_DiskStats.ReadByteCount += Result.GetSize();
return Result;
}
const uint32_t LocalPathIndex = m_LocalLookup.SequenceIndexFirstPathIndex[SequenceIndex];
@@ -1117,20 +1181,15 @@ namespace {
if (Size == m_LocalContent.RawSizes[LocalPathIndex])
{
IoBuffer Result = IoBufferBuilder::MakeFromFile(LocalFilePath);
- m_DiskStats.OpenReadCount++;
- m_DiskStats.ReadByteCount += Result.GetSize();
return CompositeBuffer(SharedBuffer(Result));
}
if (m_OpenFiles.size() == m_OpenFiles.capacity())
{
m_OpenFiles.pop_back();
- m_DiskStats.CurrentOpenFileCount--;
}
- m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(SequenceIndex, std::make_unique<BufferedOpenFile>(LocalFilePath)));
+ m_OpenFiles.insert(m_OpenFiles.begin(),
+ std::make_pair(SequenceIndex, std::make_unique<BufferedOpenFile>(LocalFilePath, m_DiskStats)));
CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size);
- m_DiskStats.ReadByteCount += Result.GetSize();
- m_DiskStats.OpenReadCount++;
- m_DiskStats.CurrentOpenFileCount++;
return Result;
}
@@ -1167,17 +1226,21 @@ namespace {
}
IoHashStream Hash;
- bool CouldDecompress = Compressed.DecompressToStream(0, RawSize, [&Hash](uint64_t, const CompositeBuffer& RangeBuffer) {
- if (!AbortFlag)
- {
- for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ bool CouldDecompress = Compressed.DecompressToStream(
+ 0,
+ RawSize,
+ [&Hash](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset, SourceSize, Offset);
+ if (!AbortFlag)
{
- Hash.Append(Segment.GetView());
+ for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ {
+ Hash.Append(Segment.GetView());
+ }
+ return true;
}
- return true;
- }
- return false;
- });
+ return false;
+ });
if (AbortFlag)
{
@@ -1330,8 +1393,7 @@ namespace {
const std::uint64_t PreferredMultipartChunkSize,
ParallellWork& Work,
WorkerThreadPool& NetworkPool,
- std::atomic<uint64_t>& BytesDownloaded,
- std::atomic<uint64_t>& MultipartAttachmentCount,
+ DownloadStatistics& DownloadStats,
std::function<void(IoBuffer&& Payload)>&& OnDownloadComplete)
{
ZEN_TRACE_CPU("DownloadLargeBlob");
@@ -1353,10 +1415,10 @@ namespace {
BuildId,
ChunkHash,
PreferredMultipartChunkSize,
- [Workload, &BytesDownloaded, OnDownloadComplete = std::move(OnDownloadComplete)](uint64_t Offset,
- const IoBuffer& Chunk,
- uint64_t BytesRemaining) {
- BytesDownloaded += Chunk.GetSize();
+ [Workload, &DownloadStats, OnDownloadComplete = std::move(OnDownloadComplete)](uint64_t Offset,
+ const IoBuffer& Chunk,
+ uint64_t BytesRemaining) {
+ DownloadStats.DownloadedChunkByteCount += Chunk.GetSize();
if (!AbortFlag.load())
{
@@ -1364,6 +1426,7 @@ namespace {
Workload->TempFile.Write(Chunk.GetView(), Offset);
if (Chunk.GetSize() == BytesRemaining)
{
+ DownloadStats.DownloadedChunkCount++;
uint64_t PayloadSize = Workload->TempFile.FileSize();
void* FileHandle = Workload->TempFile.Detach();
ZEN_ASSERT(FileHandle != nullptr);
@@ -1375,7 +1438,7 @@ namespace {
});
if (!WorkItems.empty())
{
- MultipartAttachmentCount++;
+ DownloadStats.MultipartAttachmentCount++;
}
for (auto& WorkItem : WorkItems)
{
@@ -1392,7 +1455,23 @@ namespace {
}
}
- void ValidateBuildPart(BuildStorage& Storage, const Oid& BuildId, Oid BuildPartId, const std::string_view BuildPartName)
+ struct ValidateStatistics
+ {
+ uint64_t BuildBlobSize = 0;
+ uint64_t BuildPartSize = 0;
+ uint64_t ChunkAttachmentCount = 0;
+ uint64_t BlockAttachmentCount = 0;
+ std::atomic<uint64_t> VerifiedAttachmentCount = 0;
+ std::atomic<uint64_t> VerifiedByteCount = 0;
+ uint64_t ElapsedWallTimeUS = 0;
+ };
+
+ void ValidateBuildPart(BuildStorage& Storage,
+ const Oid& BuildId,
+ Oid BuildPartId,
+ const std::string_view BuildPartName,
+ ValidateStatistics& ValidateStats,
+ DownloadStatistics& DownloadStats)
{
Stopwatch Timer;
auto _ = MakeGuard([&]() {
@@ -1411,23 +1490,27 @@ namespace {
throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", BuildId, BuildPartName));
}
}
+ ValidateStats.BuildBlobSize = Build.GetSize();
uint64_t PreferredMultipartChunkSize = DefaultPreferredMultipartChunkSize;
if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0)
{
PreferredMultipartChunkSize = ChunkSize;
}
- CbObject BuildPart = Storage.GetBuildPart(BuildId, BuildPartId);
+ CbObject BuildPart = Storage.GetBuildPart(BuildId, BuildPartId);
+ ValidateStats.BuildPartSize = BuildPart.GetSize();
ZEN_CONSOLE("Validating build part {}/{} ({})", BuildId, BuildPartId, NiceBytes(BuildPart.GetSize()));
std::vector<IoHash> ChunkAttachments;
for (CbFieldView LooseFileView : BuildPart["chunkAttachments"sv].AsObjectView()["rawHashes"sv])
{
ChunkAttachments.push_back(LooseFileView.AsBinaryAttachment());
}
+ ValidateStats.ChunkAttachmentCount = ChunkAttachments.size();
std::vector<IoHash> BlockAttachments;
for (CbFieldView BlocksView : BuildPart["blockAttachments"sv].AsObjectView()["rawHashes"sv])
{
BlockAttachments.push_back(BlocksView.AsBinaryAttachment());
}
+ ValidateStats.BlockAttachmentCount = BlockAttachments.size();
std::vector<ChunkBlockDescription> VerifyBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockAttachments);
if (VerifyBlockDescriptions.size() != BlockAttachments.size())
@@ -1453,13 +1536,9 @@ namespace {
ProgressBar ProgressBar(UsePlainProgress);
- uint64_t AttachmentsToVerifyCount = ChunkAttachments.size() + BlockAttachments.size();
- std::atomic<uint64_t> DownloadedAttachmentCount = 0;
- std::atomic<uint64_t> VerifiedAttachmentCount = 0;
- std::atomic<uint64_t> DownloadedByteCount = 0;
- std::atomic<uint64_t> VerifiedByteCount = 0;
- FilteredRate FilteredDownloadedBytesPerSecond;
- FilteredRate FilteredVerifiedBytesPerSecond;
+ uint64_t AttachmentsToVerifyCount = ChunkAttachments.size() + BlockAttachments.size();
+ FilteredRate FilteredDownloadedBytesPerSecond;
+ FilteredRate FilteredVerifiedBytesPerSecond;
std::atomic<uint64_t> MultipartAttachmentCount = 0;
@@ -1480,8 +1559,7 @@ namespace {
PreferredMultipartChunkSize,
Work,
NetworkPool,
- DownloadedByteCount,
- MultipartAttachmentCount,
+ DownloadStats,
[&, ChunkHash = ChunkAttachment](IoBuffer&& Payload) {
Payload.SetContentType(ZenContentType::kCompressedBinary);
if (!AbortFlag)
@@ -1493,6 +1571,12 @@ namespace {
{
ZEN_TRACE_CPU("ValidateBuildPart_Validate");
+ if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount ==
+ AttachmentsToVerifyCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+
FilteredVerifiedBytesPerSecond.Start();
uint64_t CompressedSize;
@@ -1502,9 +1586,9 @@ namespace {
ChunkHash,
NiceBytes(CompressedSize),
NiceBytes(DecompressedSize));
- VerifiedAttachmentCount++;
- VerifiedByteCount += DecompressedSize;
- if (VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
+ ValidateStats.VerifiedAttachmentCount++;
+ ValidateStats.VerifiedByteCount += DecompressedSize;
+ if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
{
FilteredVerifiedBytesPerSecond.Stop();
}
@@ -1529,9 +1613,9 @@ namespace {
FilteredDownloadedBytesPerSecond.Start();
IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlockAttachment);
- DownloadedAttachmentCount++;
- DownloadedByteCount += Payload.GetSize();
- if (DownloadedAttachmentCount.load() == AttachmentsToVerifyCount)
+ DownloadStats.DownloadedBlockCount++;
+ DownloadStats.DownloadedBlockByteCount += Payload.GetSize();
+ if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
@@ -1557,9 +1641,9 @@ namespace {
BlockAttachment,
NiceBytes(CompressedSize),
NiceBytes(DecompressedSize));
- VerifiedAttachmentCount++;
- VerifiedByteCount += DecompressedSize;
- if (VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
+ ValidateStats.VerifiedAttachmentCount++;
+ ValidateStats.VerifiedByteCount += DecompressedSize;
+ if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
{
FilteredVerifiedBytesPerSecond.Stop();
}
@@ -1575,17 +1659,20 @@ namespace {
Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(IsAborted, PendingWork);
+ const uint64_t DownloadedAttachmentCount = DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount;
+ const uint64_t DownloadedByteCount = DownloadStats.DownloadedChunkByteCount + DownloadStats.DownloadedBlockByteCount;
+
FilteredDownloadedBytesPerSecond.Update(DownloadedByteCount);
- FilteredVerifiedBytesPerSecond.Update(VerifiedByteCount);
+ FilteredVerifiedBytesPerSecond.Update(ValidateStats.VerifiedByteCount);
std::string Details = fmt::format("Downloaded {}/{} ({}, {}bits/s). Verified {}/{} ({}, {}B/s)",
- DownloadedAttachmentCount.load(),
+ DownloadedAttachmentCount,
AttachmentsToVerifyCount,
- NiceBytes(DownloadedByteCount.load()),
+ NiceBytes(DownloadedByteCount),
NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8),
- VerifiedAttachmentCount.load(),
+ ValidateStats.VerifiedAttachmentCount.load(),
AttachmentsToVerifyCount,
- NiceBytes(VerifiedByteCount.load()),
+ NiceBytes(ValidateStats.VerifiedByteCount.load()),
NiceNum(FilteredVerifiedBytesPerSecond.GetCurrent()));
ProgressBar.UpdateState(
@@ -1593,11 +1680,12 @@ namespace {
.Details = Details,
.TotalCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2),
.RemainingCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2 -
- (DownloadedAttachmentCount.load() + VerifiedAttachmentCount.load()))},
+ (DownloadedAttachmentCount + ValidateStats.VerifiedAttachmentCount.load()))},
false);
});
ProgressBar.Finish();
+ ValidateStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs();
}
void ArrangeChunksIntoBlocks(const ChunkedFolderContent& Content,
@@ -1698,7 +1786,9 @@ namespace {
const ChunkedFolderContent& Content,
const ChunkedContentLookup& Lookup,
uint32_t ChunkIndex,
- const std::filesystem::path& TempFolderPath)
+ const std::filesystem::path& TempFolderPath,
+ std::atomic<uint64_t>& ReadRawBytes,
+ LooseChunksStatistics& LooseChunksStats)
{
ZEN_TRACE_CPU("CompressChunk");
ZEN_ASSERT(!TempFolderPath.empty());
@@ -1731,7 +1821,12 @@ namespace {
bool CouldCompress = CompressedBuffer::CompressToStream(
CompositeBuffer(SharedBuffer(RawSource)),
- [&](uint64_t Offset, const CompositeBuffer& RangeBuffer) { CompressedFile.Write(RangeBuffer, Offset); });
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset);
+ ReadRawBytes += SourceSize;
+ CompressedFile.Write(RangeBuffer, Offset);
+ LooseChunksStats.CompressedChunkBytes += RangeBuffer.GetSize();
+ });
if (CouldCompress)
{
uint64_t CompressedSize = CompressedFile.FileSize();
@@ -1749,6 +1844,9 @@ namespace {
ZEN_ASSERT(Compressed);
ZEN_ASSERT(RawHash == ChunkHash);
ZEN_ASSERT(RawSize == ChunkSize);
+
+ LooseChunksStats.CompressedChunkCount++;
+
return Compressed.GetCompressed();
}
CompressedFile.Close();
@@ -1988,7 +2086,6 @@ namespace {
const std::uint64_t LargeAttachmentSize,
DiskStatistics& DiskStats,
UploadStatistics& UploadStats,
- GenerateBlocksStatistics& GenerateBlocksStats,
LooseChunksStatistics& LooseChunksStats)
{
ZEN_TRACE_CPU("UploadPartBlobs");
@@ -2238,8 +2335,6 @@ namespace {
Payload = std::move(CompressedBlock).GetCompressed();
}
- GenerateBlocksStats.GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex];
- GenerateBlocksStats.GeneratedBlockCount++;
GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex];
GeneratedBlockCount++;
if (GeneratedBlockCount == GenerateBlockIndexes.size())
@@ -2260,9 +2355,7 @@ namespace {
}
}
- std::atomic<uint64_t> CompressedLooseChunkCount = 0;
- std::atomic<uint64_t> CompressedLooseChunkByteCount = 0;
- std::atomic<uint64_t> RawLooseChunkByteCount = 0;
+ std::atomic<uint64_t> RawLooseChunkByteCount = 0;
// Start compression of any non-precompressed loose chunks and schedule upload
for (const uint32_t CompressLooseChunkOrderIndex : CompressLooseChunkOrderIndexes)
@@ -2276,19 +2369,20 @@ namespace {
ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk");
FilteredCompressedBytesPerSecond.Start();
- CompositeBuffer Payload = CompressChunk(Path, Content, Lookup, ChunkIndex, Path / ZenTempChunkFolderName);
+ CompositeBuffer Payload = CompressChunk(Path,
+ Content,
+ Lookup,
+ ChunkIndex,
+ Path / ZenTempChunkFolderName,
+ RawLooseChunkByteCount,
+ LooseChunksStats);
ZEN_CONSOLE_VERBOSE("Compressed chunk {} ({} -> {})",
Content.ChunkedContent.ChunkHashes[ChunkIndex],
NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]),
NiceBytes(Payload.GetSize()));
const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
UploadStats.ReadFromDiskBytes += ChunkRawSize;
- LooseChunksStats.CompressedChunkBytes += Payload.GetSize();
- LooseChunksStats.CompressedChunkCount++;
- CompressedLooseChunkByteCount += Payload.GetSize();
- CompressedLooseChunkCount++;
- RawLooseChunkByteCount += ChunkRawSize;
- if (CompressedLooseChunkCount == CompressLooseChunkOrderIndexes.size())
+ if (LooseChunksStats.CompressedChunkCount == CompressLooseChunkOrderIndexes.size())
{
FilteredCompressedBytesPerSecond.Stop();
}
@@ -2303,20 +2397,21 @@ namespace {
Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(IsAborted, PendingWork);
- FilteredCompressedBytesPerSecond.Update(CompressedLooseChunkByteCount.load());
+ FilteredCompressedBytesPerSecond.Update(LooseChunksStats.CompressedChunkBytes.load());
FilteredGenerateBlockBytesPerSecond.Update(GeneratedBlockByteCount.load());
FilteredUploadedBytesPerSecond.Update(UploadedCompressedChunkSize.load() + UploadedBlockSize.load());
uint64_t UploadedRawSize = UploadedRawChunkSize.load() + UploadedBlockSize.load();
uint64_t UploadedCompressedSize = UploadedCompressedChunkSize.load() + UploadedBlockSize.load();
std::string Details = fmt::format(
- "Compressed {}/{} ({}/{}) chunks. "
+ "Compressed {}/{} ({}/{} {}B/s) chunks. "
"Uploaded {}/{} ({}/{}) blobs "
"({} {}bits/s)",
- CompressedLooseChunkCount.load(),
+ LooseChunksStats.CompressedChunkCount.load(),
CompressLooseChunkOrderIndexes.size(),
NiceBytes(RawLooseChunkByteCount),
NiceBytes(TotalLooseChunksSize),
+ NiceNum(FilteredCompressedBytesPerSecond.GetCurrent()),
UploadedBlockCount.load() + UploadedChunkCount.load(),
UploadBlockCount + UploadChunkCount,
@@ -2336,9 +2431,8 @@ namespace {
ZEN_ASSERT(AbortFlag || QueuedPendingInMemoryBlocksForUpload.load() == 0);
ProgressBar.Finish();
- UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS();
- GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGenerateBlockBytesPerSecond.GetElapsedTimeUS();
- LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTimeUS();
+ UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS();
+ LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTimeUS();
}
}
@@ -2531,6 +2625,8 @@ namespace {
CreateDirectories(Path / ZenTempBlockFolderName);
CreateDirectories(Path / ZenTempChunkFolderName);
+ std::uint64_t TotalRawSize = 0;
+
CbObject ChunkerParameters;
struct PrepareBuildResult
@@ -2751,7 +2847,7 @@ namespace {
ChunkerParameters = ChunkParametersWriter.Save();
}
- std::uint64_t TotalRawSize = std::accumulate(Content.RawSizes.begin(), Content.RawSizes.end(), std::uint64_t(0));
+ TotalRawSize = std::accumulate(Content.RawSizes.begin(), Content.RawSizes.end(), std::uint64_t(0));
{
ProgressBar ProgressBar(UsePlainProgress);
@@ -3120,21 +3216,16 @@ namespace {
{
ZEN_CONSOLE_VERBOSE("Uploading attachments: {}", FormatArray<IoHash>(RawHashes, "\n "sv));
- UploadStatistics TempUploadStats;
- GenerateBlocksStatistics TempGenerateBlocksStats;
- LooseChunksStatistics TempLooseChunksStats;
+ UploadStatistics TempUploadStats;
+ LooseChunksStatistics TempLooseChunksStats;
Stopwatch TempUploadTimer;
auto __ = MakeGuard([&]() {
uint64_t TempChunkUploadTimeUs = TempUploadTimer.GetElapsedTimeUs();
ZEN_CONSOLE(
- "Generated {} ({} {}B/s) and uploaded {} ({}) blocks. "
+ "Uploaded {} ({}) blocks. "
"Compressed {} ({} {}B/s) and uploaded {} ({}) chunks. "
"Transferred {} ({}bits/s) in {}",
- TempGenerateBlocksStats.GeneratedBlockCount.load(),
- NiceBytes(TempGenerateBlocksStats.GeneratedBlockByteCount.load()),
- NiceNum(GetBytesPerSecond(TempGenerateBlocksStats.GenerateBlocksElapsedWallTimeUS,
- TempGenerateBlocksStats.GeneratedBlockByteCount)),
TempUploadStats.BlockCount.load(),
NiceBytes(TempUploadStats.BlocksBytes),
@@ -3161,11 +3252,9 @@ namespace {
LargeAttachmentSize,
DiskStats,
TempUploadStats,
- TempGenerateBlocksStats,
TempLooseChunksStats);
UploadStats += TempUploadStats;
LooseChunksStats += TempLooseChunksStats;
- GenerateBlocksStats += TempGenerateBlocksStats;
}
};
if (IgnoreExistingBlocks)
@@ -3247,18 +3336,25 @@ namespace {
}
}
+ ValidateStatistics ValidateStats;
+ DownloadStatistics ValidateDownloadStats;
if (PostUploadVerify && !AbortFlag)
{
- ValidateBuildPart(Storage, BuildId, BuildPartId, BuildPartName);
+ ValidateBuildPart(Storage, BuildId, BuildPartId, BuildPartName, ValidateStats, ValidateDownloadStats);
}
- const double DeltaByteCountPercent =
- ChunkingStats.BytesHashed > 0
- ? (100.0 * (FindBlocksStats.NewBlocksChunkByteCount + LooseChunksStats.CompressedChunkBytes)) / (ChunkingStats.BytesHashed)
- : 0.0;
-
- const std::string LargeAttachmentStats =
- (LargeAttachmentSize != (uint64_t)-1) ? fmt::format(" ({} as multipart)", UploadStats.MultipartAttachmentCount.load()) : "";
+ struct ValidateStatistics
+ {
+ uint64_t BuildBlobSize = 0;
+ uint64_t BuildPartSize = 0;
+ uint64_t ChunkAttachmentCount = 0;
+ uint64_t BlockAttachmentCount = 0;
+ std::atomic<uint64_t> DownloadedAttachmentCount = 0;
+ std::atomic<uint64_t> VerifiedAttachmentCount = 0;
+ std::atomic<uint64_t> DownloadedByteCount = 0;
+ std::atomic<uint64_t> VerifiedByteCount = 0;
+ uint64_t ElapsedWallTimeUS = 0;
+ };
ZEN_CONSOLE_VERBOSE(
"Folder scanning stats:"
@@ -3382,42 +3478,122 @@ namespace {
UploadStats.MultipartAttachmentCount.load(),
NiceLatencyNs(UploadStats.ElapsedWallTimeUS * 1000));
+ if (PostUploadVerify)
+ {
+ ZEN_CONSOLE_VERBOSE(
+ "Validate stats:"
+ "\n BuildBlobSize: {}"
+ "\n BuildPartSize: {}"
+ "\n ChunkAttachmentCount: {}"
+ "\n BlockAttachmentCount: {}"
+ "\n VerifiedAttachmentCount: {}"
+ "\n VerifiedByteCount: {}"
+ "\n ElapsedWallTimeUS: {}",
+ NiceBytes(ValidateStats.BuildBlobSize),
+ NiceBytes(ValidateStats.BuildPartSize),
+ ValidateStats.ChunkAttachmentCount,
+ ValidateStats.BlockAttachmentCount,
+ ValidateStats.VerifiedAttachmentCount.load(),
+ NiceBytes(ValidateStats.VerifiedByteCount.load()),
+ NiceLatencyNs(ValidateStats.ElapsedWallTimeUS * 1000));
+
+ ZEN_CONSOLE_VERBOSE(
+ "Validate download stats:"
+ "\n RequestsCompleteCount: {}"
+ "\n DownloadedChunkCount: {}"
+ "\n DownloadedChunkByteCount: {}"
+ "\n MultipartAttachmentCount: {}"
+ "\n DownloadedBlockCount: {}"
+ "\n DownloadedBlockByteCount: {}"
+ "\n DownloadedPartialBlockCount: {}"
+ "\n DownloadedPartialBlockByteCount: {}",
+ ValidateDownloadStats.RequestsCompleteCount.load(),
+ ValidateDownloadStats.DownloadedChunkCount.load(),
+ NiceBytes(ValidateDownloadStats.DownloadedChunkByteCount.load()),
+ ValidateDownloadStats.MultipartAttachmentCount.load(),
+ ValidateDownloadStats.DownloadedBlockCount.load(),
+ NiceBytes(ValidateDownloadStats.DownloadedBlockByteCount.load()),
+ ValidateDownloadStats.DownloadedPartialBlockCount.load(),
+ NiceBytes(ValidateDownloadStats.DownloadedPartialBlockByteCount.load()));
+ }
+
+ const double DeltaByteCountPercent =
+ ChunkingStats.BytesHashed > 0
+ ? (100.0 * (FindBlocksStats.NewBlocksChunkByteCount + LooseChunksStats.CompressedChunkBytes)) / (ChunkingStats.BytesHashed)
+ : 0.0;
+
+ const std::string MultipartAttachmentStats =
+ (LargeAttachmentSize != (uint64_t)-1) ? fmt::format(" ({} as multipart)", UploadStats.MultipartAttachmentCount.load()) : "";
+
+ std::string ValidateInfo;
+ if (PostUploadVerify)
+ {
+ const uint64_t DownloadedCount = ValidateDownloadStats.DownloadedChunkCount + ValidateDownloadStats.DownloadedBlockCount;
+ const uint64_t DownloadedByteCount =
+ ValidateDownloadStats.DownloadedChunkByteCount + ValidateDownloadStats.DownloadedBlockByteCount;
+ ValidateInfo = fmt::format("\n Verified: {} ({}) {}B/sec in {}",
+ DownloadedCount,
+ NiceBytes(DownloadedByteCount),
+ NiceNum(GetBytesPerSecond(ValidateStats.ElapsedWallTimeUS, DownloadedByteCount)),
+ NiceTimeSpanMs(ValidateStats.ElapsedWallTimeUS / 1000));
+ }
+
ZEN_CONSOLE(
- "Uploaded {}\n"
- " Delta: {}/{} ({:.1f}%)\n"
- " Blocks: {} ({})\n"
- " Chunks: {} ({}){}\n"
- " Rate: {}bits/sec",
- NiceBytes(UploadStats.BlocksBytes + UploadStats.ChunksBytes),
+ "Uploaded part {} ('{}') to build {} in {} \n"
+ " Scanned files: {} ({}) {}B/sec in {}\n"
+ " New data: {} ({:.1f}%)\n"
+ " New blocks: {} ({}) {}B/sec\n"
+ " New chunks: {} ({} -> {}) {}B/sec\n"
+ " Uploaded: {} ({}) {}bits/sec\n"
+ " Blocks: {} ({})\n"
+ " Chunks: {} ({}){}"
+ "{}",
+ BuildPartId,
+ BuildPartName,
+ BuildId,
+ NiceTimeSpanMs(ProcessTimer.GetElapsedTimeMs()),
+
+ LocalFolderScanStats.FoundFileCount.load(),
+ NiceBytes(LocalFolderScanStats.FoundFileByteCount.load()),
+ NiceNum(GetBytesPerSecond(ChunkingStats.ElapsedWallTimeUS, ChunkingStats.BytesHashed)),
+ NiceTimeSpanMs(ChunkingStats.ElapsedWallTimeUS / 1000),
NiceBytes(FindBlocksStats.NewBlocksChunkByteCount + LooseChunksStats.CompressedChunkBytes),
- NiceBytes(ChunkingStats.BytesHashed),
DeltaByteCountPercent,
+ GenerateBlocksStats.GeneratedBlockCount.load(),
+ NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount.load()),
+ NiceNum(GetBytesPerSecond(GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS, GenerateBlocksStats.GeneratedBlockByteCount)),
+
+ LooseChunksStats.CompressedChunkCount.load(),
+ NiceBytes(LooseChunksStats.ChunkByteCount),
+ NiceBytes(LooseChunksStats.CompressedChunkBytes.load()),
+ NiceNum(GetBytesPerSecond(LooseChunksStats.CompressChunksElapsedWallTimeUS, LooseChunksStats.ChunkByteCount)),
+
+ NiceBytes(UploadStats.BlockCount.load() + UploadStats.ChunkCount.load()),
+ NiceBytes(UploadStats.BlocksBytes + UploadStats.ChunksBytes),
+ NiceNum(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, (UploadStats.ChunksBytes + UploadStats.BlocksBytes * 8))),
+
UploadStats.BlockCount.load(),
- NiceBytes(UploadStats.BlocksBytes),
- UploadStats.ChunkCount.load(),
- NiceBytes(UploadStats.ChunksBytes),
- LargeAttachmentStats,
+ NiceBytes(UploadStats.BlocksBytes.load()),
- NiceNum(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, (UploadStats.ChunksBytes + UploadStats.BlocksBytes * 8))));
+ UploadStats.ChunkCount.load(),
+ NiceBytes(UploadStats.ChunksBytes.load()),
+ MultipartAttachmentStats,
- ZEN_CONSOLE("Uploaded ({}) build {} part {} ({}) in {}",
- NiceBytes(FindBlocksStats.NewBlocksChunkByteCount + LooseChunksStats.CompressedChunkBytes),
- BuildId,
- BuildPartName,
- BuildPartId,
- NiceTimeSpanMs(ProcessTimer.GetElapsedTimeMs()));
+ ValidateInfo);
}
- void VerifyFolder(const ChunkedFolderContent& Content, const std::filesystem::path& Path, bool VerifyFileHash)
+ void VerifyFolder(const ChunkedFolderContent& Content,
+ const std::filesystem::path& Path,
+ bool VerifyFileHash,
+ VerifyFolderStatistics& VerifyFolderStats)
{
ZEN_TRACE_CPU("VerifyFolder");
- ProgressBar ProgressBar(UsePlainProgress);
- std::atomic<uint64_t> FilesVerified(0);
- std::atomic<uint64_t> FilesFailed(0);
- std::atomic<uint64_t> ReadBytes(0);
+ Stopwatch Timer;
+
+ ProgressBar ProgressBar(UsePlainProgress);
WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
@@ -3473,7 +3649,7 @@ namespace {
ErrorLock.WithExclusiveLock([&]() {
Errors.push_back(fmt::format("File {} with expected size {} does not exist", TargetPath, ExpectedSize));
});
- FilesFailed++;
+ VerifyFolderStats.FilesFailed++;
}
else
{
@@ -3485,7 +3661,7 @@ namespace {
Errors.push_back(
fmt::format("Failed to get size of file {}: {} ({})", TargetPath, Ec.message(), Ec.value()));
});
- FilesFailed++;
+ VerifyFolderStats.FilesFailed++;
}
else if (SizeOnDisk < ExpectedSize)
{
@@ -3495,7 +3671,7 @@ namespace {
ExpectedSize,
SizeOnDisk));
});
- FilesFailed++;
+ VerifyFolderStats.FilesFailed++;
}
else if (SizeOnDisk > ExpectedSize)
{
@@ -3505,7 +3681,7 @@ namespace {
ExpectedSize,
SizeOnDisk));
});
- FilesFailed++;
+ VerifyFolderStats.FilesFailed++;
}
else if (SizeOnDisk > 0 && VerifyFileHash)
{
@@ -3540,13 +3716,13 @@ namespace {
}
FileOffset += ChunkSize;
}
- FilesFailed++;
+ VerifyFolderStats.FilesFailed++;
}
- ReadBytes += SizeOnDisk;
+ VerifyFolderStats.ReadBytes += SizeOnDisk;
}
}
}
- FilesVerified++;
+ VerifyFolderStats.FilesVerified++;
}
},
[&, PathIndex](const std::exception& Ex, std::atomic<bool>&) {
@@ -3555,23 +3731,25 @@ namespace {
(Path / Content.Paths[PathIndex]).make_preferred(),
Ex.what()));
});
- FilesFailed++;
+ VerifyFolderStats.FilesFailed++;
});
}
Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(IsAborted, PendingWork);
std::string Details = fmt::format("Verified {}/{} ({}). Failed files: {}",
- FilesVerified.load(),
+ VerifyFolderStats.FilesVerified.load(),
PathCount,
- NiceBytes(ReadBytes.load()),
- FilesFailed.load());
+ NiceBytes(VerifyFolderStats.ReadBytes.load()),
+ VerifyFolderStats.FilesFailed.load());
ProgressBar.UpdateState({.Task = "Verifying files ",
.Details = Details,
.TotalCount = gsl::narrow<uint64_t>(PathCount),
- .RemainingCount = gsl::narrow<uint64_t>(PathCount - FilesVerified.load())},
+ .RemainingCount = gsl::narrow<uint64_t>(PathCount - VerifyFolderStats.FilesVerified.load())},
false);
});
+ VerifyFolderStats.VerifyElapsedWallTimeUs = Timer.GetElapsedTimeUs();
+
ProgressBar.Finish();
for (const std::string& Error : Errors)
{
@@ -3586,7 +3764,7 @@ namespace {
class WriteFileCache
{
public:
- WriteFileCache() {}
+ WriteFileCache(DiskStatistics& DiskStats) : m_DiskStats(DiskStats) {}
~WriteFileCache() { Flush(); }
template<typename TBufferType>
@@ -3602,6 +3780,8 @@ namespace {
ZEN_TRACE_CPU("WriteFileCache_WriteToFile_CacheWrite");
ZEN_ASSERT(OpenFileWriter);
OpenFileWriter->Write(Buffer, FileOffset);
+ m_DiskStats.WriteCount++;
+ m_DiskStats.WriteByteCount += Buffer.GetSize();
}
else
{
@@ -3624,6 +3804,8 @@ namespace {
}
return --Tries > 0;
});
+ m_DiskStats.OpenWriteCount++;
+ m_DiskStats.CurrentOpenFileCount++;
}
const bool CacheWriter = TargetFinalSize > Buffer.GetSize();
@@ -3635,12 +3817,18 @@ namespace {
OutputFile = std::move(NewOutputFile);
OpenFileWriter = std::make_unique<BasicFileWriter>(*OutputFile, Min(TargetFinalSize, 256u * 1024u));
OpenFileWriter->Write(Buffer, FileOffset);
+ m_DiskStats.WriteCount++;
+ m_DiskStats.WriteByteCount += Buffer.GetSize();
SeenTargetIndexes.push_back(TargetIndex);
}
else
{
ZEN_TRACE_CPU("WriteFileCache_WriteToFile_Write");
NewOutputFile->Write(Buffer, FileOffset);
+ m_DiskStats.WriteCount++;
+ m_DiskStats.WriteByteCount += Buffer.GetSize();
+ NewOutputFile = {};
+ m_DiskStats.CurrentOpenFileCount--;
}
}
}
@@ -3648,9 +3836,15 @@ namespace {
void Flush()
{
ZEN_TRACE_CPU("WriteFileCache_Flush");
+ if (OutputFile)
+ {
+ m_DiskStats.CurrentOpenFileCount--;
+ }
+
OpenFileWriter = {};
OutputFile = {};
}
+ DiskStatistics& m_DiskStats;
std::vector<uint32_t> SeenTargetIndexes;
std::unique_ptr<BasicFile> OutputFile;
std::unique_ptr<BasicFileWriter> OpenFileWriter;
@@ -3693,12 +3887,12 @@ namespace {
const ChunkedContentLookup& Lookup,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
const BlockWriteOps& Ops,
- std::atomic<uint32_t>& OutChunksComplete,
- std::atomic<uint64_t>& OutBytesWritten)
+ DiskStatistics& DiskStats,
+ WriteChunkStatistics& WriteChunkStats)
{
ZEN_TRACE_CPU("WriteBlockChunkOps");
{
- WriteFileCache OpenFileCache;
+ WriteFileCache OpenFileCache(DiskStats);
for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps)
{
if (AbortFlag)
@@ -3724,8 +3918,13 @@ namespace {
Chunk,
FileOffset,
RemoteContent.RawSizes[PathIndex]);
- OutBytesWritten += ChunkSize;
}
+ WriteChunkStats.ChunkCountWritten += gsl::narrow<uint32_t>(Ops.ChunkBuffers.size());
+ WriteChunkStats.ChunkBytesWritten +=
+ std::accumulate(Ops.ChunkBuffers.begin(),
+ Ops.ChunkBuffers.end(),
+ uint64_t(0),
+ [](uint64_t Current, const CompositeBuffer& Buffer) -> uint64_t { return Current + Buffer.GetSize(); });
}
if (!AbortFlag)
{
@@ -3753,7 +3952,6 @@ namespace {
GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
}
}
- OutChunksComplete += gsl::narrow<uint32_t>(Ops.ChunkBuffers.size());
}
}
@@ -3865,8 +4063,8 @@ namespace {
CompositeBuffer&& BlockBuffer,
const ChunkedContentLookup& Lookup,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- std::atomic<uint32_t>& OutChunksComplete,
- std::atomic<uint64_t>& OutBytesWritten)
+ DiskStatistics& DiskStats,
+ WriteChunkStatistics& WriteChunkStats)
{
ZEN_TRACE_CPU("WriteBlockToDisk");
@@ -3899,8 +4097,8 @@ namespace {
Lookup,
SequenceIndexChunksLeftToWriteCounters,
Ops,
- OutChunksComplete,
- OutBytesWritten);
+ DiskStats,
+ WriteChunkStats);
return true;
}
return false;
@@ -3925,8 +4123,8 @@ namespace {
Lookup,
SequenceIndexChunksLeftToWriteCounters,
Ops,
- OutChunksComplete,
- OutBytesWritten);
+ DiskStats,
+ WriteChunkStats);
return true;
}
return false;
@@ -3941,8 +4139,8 @@ namespace {
uint32_t LastIncludedBlockChunkIndex,
const ChunkedContentLookup& Lookup,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- std::atomic<uint32_t>& OutChunksComplete,
- std::atomic<uint64_t>& OutBytesWritten)
+ DiskStatistics& DiskStats,
+ WriteChunkStatistics& WriteChunkStats)
{
ZEN_TRACE_CPU("WritePartialBlockToDisk");
BlockWriteOps Ops;
@@ -3963,8 +4161,8 @@ namespace {
Lookup,
SequenceIndexChunksLeftToWriteCounters,
Ops,
- OutChunksComplete,
- OutBytesWritten);
+ DiskStats,
+ WriteChunkStats);
return true;
}
else
@@ -4012,8 +4210,7 @@ namespace {
const ChunkedContentLookup& Lookup,
std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> ChunkTargets,
CompositeBuffer&& ChunkData,
- WriteFileCache& OpenFileCache,
- std::atomic<uint64_t>& OutBytesWritten)
+ WriteFileCache& OpenFileCache)
{
ZEN_TRACE_CPU("WriteChunkToDisk");
@@ -4032,7 +4229,6 @@ namespace {
ChunkData,
FileOffset,
Content.RawSizes[PathIndex]);
- OutBytesWritten += ChunkData.GetSize();
}
}
@@ -4053,7 +4249,8 @@ namespace {
void StreamDecompress(const std::filesystem::path& CacheFolderPath,
const IoHash& SequenceRawHash,
CompositeBuffer&& CompressedPart,
- std::atomic<uint64_t>& WriteToDiskBytes)
+ DiskStatistics& DiskStats,
+ WriteChunkStatistics& WriteChunkStats)
{
ZEN_TRACE_CPU("StreamDecompress");
const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash);
@@ -4076,21 +4273,29 @@ namespace {
{
throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash));
}
+
IoHashStream Hash;
- bool CouldDecompress = Compressed.DecompressToStream(0, (uint64_t)-1, [&](uint64_t Offset, const CompositeBuffer& RangeBuffer) {
- ZEN_TRACE_CPU("StreamDecompress_Write");
- if (!AbortFlag)
- {
- DecompressedTemp.Write(RangeBuffer, Offset);
- for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ bool CouldDecompress = Compressed.DecompressToStream(
+ 0,
+ (uint64_t)-1,
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset);
+ ZEN_TRACE_CPU("StreamDecompress_Write");
+ DiskStats.ReadByteCount += SourceSize;
+ if (!AbortFlag)
{
- Hash.Append(Segment.GetView());
+ WriteChunkStats.ChunkBytesWritten += RangeBuffer.GetSize();
+ DecompressedTemp.Write(RangeBuffer, Offset);
+ for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ {
+ Hash.Append(Segment.GetView());
+ }
+ DiskStats.WriteByteCount += RangeBuffer.GetSize();
+ DiskStats.WriteCount++;
+ return true;
}
- WriteToDiskBytes += RangeBuffer.GetSize();
- return true;
- }
- return false;
- });
+ return false;
+ });
if (AbortFlag)
{
@@ -4113,6 +4318,7 @@ namespace {
throw std::runtime_error(
fmt::format("Failed moving temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message()));
}
+ WriteChunkStats.ChunkCountWritten++;
}
bool WriteCompressedChunk(const std::filesystem::path& TargetFolder,
@@ -4121,32 +4327,34 @@ namespace {
const IoHash& ChunkHash,
const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
IoBuffer&& CompressedPart,
- std::atomic<uint64_t>& WriteToDiskBytes)
+ DiskStatistics& DiskStats,
+ WriteChunkStatistics& WriteChunkStats)
{
auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end());
+ const uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second;
+ const uint64_t ChunkRawSize = RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex];
if (CanDecompressDirectToSequence(RemoteContent, ChunkTargetPtrs))
{
- const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[ChunkTargetPtrs.front()->SequenceIndex];
- StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), WriteToDiskBytes);
+ const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex;
+ const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex];
+ StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), DiskStats, WriteChunkStats);
}
else
{
- const uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second;
- SharedBuffer Chunk =
- Decompress(CompositeBuffer(std::move(CompressedPart)), ChunkHash, RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+ SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)), ChunkHash, ChunkRawSize);
if (!AbortFlag)
{
- WriteFileCache OpenFileCache;
-
+ WriteFileCache OpenFileCache(DiskStats);
WriteChunkToDisk(TargetFolder,
RemoteContent,
RemoteLookup,
ChunkTargetPtrs,
CompositeBuffer(std::move(Chunk)),
- OpenFileCache,
- WriteToDiskBytes);
+ OpenFileCache);
+ WriteChunkStats.ChunkCountWritten++;
+ WriteChunkStats.ChunkBytesWritten += ChunkRawSize;
return true;
}
}
@@ -4198,19 +4406,17 @@ namespace {
WorkerThreadPool& WritePool,
IoBuffer&& Payload,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
- std::atomic<uint64_t>& WriteToDiskBytes,
- std::atomic<uint32_t>& ChunkCountWritten,
std::atomic<uint64_t>& WritePartsComplete,
- std::atomic<uint64_t>& TotalPartWriteCount,
- std::atomic<uint64_t>& LooseChunksBytes,
- FilteredRate& FilteredWrittenBytesPerSecond)
+ const uint64_t TotalPartWriteCount,
+ FilteredRate& FilteredWrittenBytesPerSecond,
+ DiskStatistics& DiskStats,
+ WriteChunkStatistics& WriteChunkStats)
{
ZEN_TRACE_CPU("AsyncWriteDownloadedChunk");
const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
- uint64_t Size = Payload.GetSize();
- LooseChunksBytes += Size;
+ const uint64_t Size = Payload.GetSize();
std::filesystem::path CompressedChunkPath;
@@ -4256,6 +4462,7 @@ namespace {
SequenceIndexChunksLeftToWriteCounters,
CompressedChunkPath,
RemoteChunkIndex,
+ TotalPartWriteCount,
ChunkTargetPtrs = std::move(ChunkTargetPtrs),
CompressedPart = std::move(Payload)](std::atomic<bool>&) mutable {
ZEN_TRACE_CPU("UpdateFolder_WriteChunk");
@@ -4288,11 +4495,11 @@ namespace {
ChunkHash,
ChunkTargetPtrs,
std::move(CompressedPart),
- WriteToDiskBytes);
+ DiskStats,
+ WriteChunkStats);
if (!AbortFlag)
{
- ChunkCountWritten++;
WritePartsComplete++;
if (WritePartsComplete == TotalPartWriteCount)
{
@@ -4324,19 +4531,16 @@ namespace {
const std::vector<IoHash>& LooseChunkHashes,
bool AllowPartialBlockRequests,
bool WipeTargetFolder,
- FolderContent& OutLocalFolderState)
+ FolderContent& OutLocalFolderState,
+ DiskStatistics& DiskStats,
+ CacheMappingStatistics& CacheMappingStats,
+ DownloadStatistics& DownloadStats,
+ WriteChunkStatistics& WriteChunkStats,
+ RebuildFolderStateStatistics& RebuildFolderStateStats)
{
ZEN_TRACE_CPU("UpdateFolder");
ZEN_UNUSED(WipeTargetFolder);
- std::atomic<uint64_t> DownloadedBlocks = 0;
- std::atomic<uint64_t> BlockBytes = 0;
- std::atomic<uint64_t> DownloadedChunks = 0;
- std::atomic<uint64_t> LooseChunksBytes = 0;
- std::atomic<uint64_t> WriteToDiskBytes = 0;
- std::atomic<uint64_t> MultipartAttachmentCount = 0;
-
- DiskStatistics DiskStats;
Stopwatch IndexTimer;
@@ -4351,15 +4555,11 @@ namespace {
Stopwatch CacheMappingTimer;
std::vector<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters(RemoteContent.ChunkedContent.SequenceRawHashes.size());
- // std::vector<bool> RemoteSequenceIndexIsCachedFlags(RemoteContent.ChunkedContent.SequenceRawHashes.size(), false);
- std::vector<bool> RemoteChunkIndexNeedsCopyFromLocalFileFlags(RemoteContent.ChunkedContent.ChunkHashes.size());
- // Guard if he same chunks is in multiple blocks (can happen due to block reuse, cache reuse blocks writes directly)
- std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(RemoteContent.ChunkedContent.ChunkHashes.size());
+ std::vector<bool> RemoteChunkIndexNeedsCopyFromLocalFileFlags(RemoteContent.ChunkedContent.ChunkHashes.size());
+ std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(RemoteContent.ChunkedContent.ChunkHashes.size());
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedChunkHashesFound;
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedSequenceHashesFound;
- uint64_t CachedChunkHashesByteCountFound = 0;
- uint64_t CachedSequenceHashesByteCountFound = 0;
{
ZEN_TRACE_CPU("UpdateFolder_CheckChunkCache");
@@ -4380,7 +4580,8 @@ namespace {
if (ChunkSize == CacheDirContent.FileSizes[Index])
{
CachedChunkHashesFound.insert({FileHash, ChunkIndex});
- CachedChunkHashesByteCountFound += ChunkSize;
+ CacheMappingStats.CacheChunkCount++;
+ CacheMappingStats.CacheChunkByteCount += ChunkSize;
continue;
}
}
@@ -4393,7 +4594,8 @@ namespace {
if (SequenceSize == CacheDirContent.FileSizes[Index])
{
CachedSequenceHashesFound.insert({FileHash, SequenceIndex});
- CachedSequenceHashesByteCountFound += SequenceSize;
+ CacheMappingStats.CacheSequenceHashesCount += SequenceSize;
+ CacheMappingStats.CacheSequenceHashesByteCount++;
continue;
}
}
@@ -4403,7 +4605,6 @@ namespace {
}
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedBlocksFound;
- uint64_t CachedBlocksByteCountFound = 0;
{
ZEN_TRACE_CPU("UpdateFolder_CheckBlockCache");
@@ -4438,7 +4639,8 @@ namespace {
if (BlockSize == BlockDirContent.FileSizes[Index])
{
CachedBlocksFound.insert({FileHash, BlockIndex});
- CachedBlocksByteCountFound += BlockSize;
+ CacheMappingStats.CacheBlockCount++;
+ CacheMappingStats.CacheBlocksByteCount += BlockSize;
continue;
}
}
@@ -4448,7 +4650,6 @@ namespace {
}
std::vector<uint32_t> LocalPathIndexesMatchingSequenceIndexes;
- uint64_t LocalPathIndexesByteCountMatchingSequenceIndexes = 0;
// Pick up all whole files we can use from current local state
{
ZEN_TRACE_CPU("UpdateFolder_CheckLocalChunks");
@@ -4477,7 +4678,8 @@ namespace {
const uint32_t LocalPathIndex = GetFirstPathIndexForSeqeuenceIndex(LocalLookup, LocalSequenceIndex);
uint64_t RawSize = LocalContent.RawSizes[LocalPathIndex];
LocalPathIndexesMatchingSequenceIndexes.push_back(LocalPathIndex);
- LocalPathIndexesByteCountMatchingSequenceIndexes += RawSize;
+ CacheMappingStats.LocalPathsMatchingSequencesCount++;
+ CacheMappingStats.LocalPathsMatchingSequencesByteCount += RawSize;
}
else
{
@@ -4495,7 +4697,7 @@ namespace {
struct ChunkTarget
{
uint32_t TargetChunkLocationCount = (uint32_t)-1;
- uint64_t ChunkRawSize = (uint64_t)-1;
+ uint32_t RemoteChunkIndex = (uint32_t)-1;
uint64_t CacheFileOffset = (uint64_t)-1;
};
std::vector<ChunkTarget> ChunkTargets;
@@ -4503,8 +4705,6 @@ namespace {
tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCacheCopyDataIndex;
std::vector<CacheCopyData> CacheCopyDatas;
- uint64_t LocalChunkHashesMatchingRemoteCount = 0;
- uint64_t LocalChunkHashesMatchingRemoteByteCount = 0;
{
ZEN_TRACE_CPU("UpdateFolder_GetLocalChunks");
@@ -4537,7 +4737,7 @@ namespace {
{
CacheCopyData::ChunkTarget Target = {
.TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()),
- .ChunkRawSize = LocalChunkRawSize,
+ .RemoteChunkIndex = RemoteChunkIndex,
.CacheFileOffset = SourceOffset};
if (auto CopySourceIt = RawHashToCacheCopyDataIndex.find(LocalSequenceRawHash);
CopySourceIt != RawHashToCacheCopyDataIndex.end())
@@ -4567,8 +4767,8 @@ namespace {
.TargetChunkLocationPtrs = ChunkTargetPtrs,
.ChunkTargets = std::vector<CacheCopyData::ChunkTarget>{Target}});
}
- LocalChunkHashesMatchingRemoteByteCount += LocalChunkRawSize;
- LocalChunkHashesMatchingRemoteCount++;
+ CacheMappingStats.LocalChunkMatchingRemoteCount++;
+ CacheMappingStats.LocalChunkMatchingRemoteByteCount += LocalChunkRawSize;
RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true;
}
}
@@ -4580,20 +4780,20 @@ namespace {
}
if (!CachedSequenceHashesFound.empty() || !CachedChunkHashesFound.empty() || !CachedBlocksFound.empty() ||
- !LocalPathIndexesMatchingSequenceIndexes.empty() || LocalChunkHashesMatchingRemoteCount > 0)
+ !LocalPathIndexesMatchingSequenceIndexes.empty() || CacheMappingStats.LocalChunkMatchingRemoteCount > 0)
{
ZEN_CONSOLE(
"Cache: {} ({}) chunk sequences, {} ({}) chunks, {} ({}) blocks. Local state: {} ({}) chunk sequences, {} ({}) chunks",
CachedSequenceHashesFound.size(),
- NiceBytes(CachedSequenceHashesByteCountFound),
+ NiceBytes(CacheMappingStats.CacheSequenceHashesByteCount),
CachedChunkHashesFound.size(),
- NiceBytes(CachedChunkHashesByteCountFound),
+ NiceBytes(CacheMappingStats.CacheChunkByteCount),
CachedBlocksFound.size(),
- NiceBytes(CachedBlocksByteCountFound),
+ NiceBytes(CacheMappingStats.CacheBlocksByteCount),
LocalPathIndexesMatchingSequenceIndexes.size(),
- NiceBytes(LocalPathIndexesByteCountMatchingSequenceIndexes),
- LocalChunkHashesMatchingRemoteCount,
- NiceBytes(LocalChunkHashesMatchingRemoteByteCount));
+ NiceBytes(CacheMappingStats.LocalPathsMatchingSequencesByteCount),
+ CacheMappingStats.LocalChunkMatchingRemoteCount,
+ NiceBytes(CacheMappingStats.LocalChunkMatchingRemoteByteCount));
}
uint32_t ChunkCountToWrite = 0;
@@ -4616,9 +4816,7 @@ namespace {
}
uint64_t TotalRequestCount = 0;
- std::atomic<uint64_t> RequestsComplete = 0;
- std::atomic<uint32_t> ChunkCountWritten = 0;
- std::atomic<uint64_t> TotalPartWriteCount = 0;
+ uint64_t TotalPartWriteCount = 0;
std::atomic<uint64_t> WritePartsComplete = 0;
{
@@ -4635,8 +4833,6 @@ namespace {
ProgressBar WriteProgressBar(UsePlainProgress);
ParallellWork Work(AbortFlag);
- std::atomic<uint64_t> BytesDownloaded = 0;
-
struct LooseChunkHashWorkData
{
std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs;
@@ -4718,12 +4914,6 @@ namespace {
std::vector<uint32_t> FullBlockWorks;
- size_t BlocksNeededCount = 0;
- uint64_t AllBlocksSize = 0;
- uint64_t AllBlocksFetch = 0;
- uint64_t AllBlocksSlack = 0;
- uint64_t AllBlockRequests = 0;
- uint64_t AllBlockChunksSize = 0;
for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++)
{
const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
@@ -4779,7 +4969,6 @@ namespace {
}
else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
{
- AllBlockChunksSize += ChunkCompressedLength;
if (NextRange.RangeLength == 0)
{
NextRange.RangeStart = CurrentOffset;
@@ -4796,7 +4985,6 @@ namespace {
ZEN_ASSERT(false);
}
}
- AllBlocksSize += CurrentOffset;
if (NextRange.RangeLength > 0)
{
BlockRanges.push_back(NextRange);
@@ -4806,7 +4994,6 @@ namespace {
std::vector<BlockRangeDescriptor> CollapsedBlockRanges;
auto It = BlockRanges.begin();
CollapsedBlockRanges.push_back(*It++);
- uint64_t TotalSlack = 0;
while (It != BlockRanges.end())
{
BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back();
@@ -4817,7 +5004,6 @@ namespace {
LastRange.ChunkBlockIndexCount =
(It->ChunkBlockIndexStart + It->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart;
LastRange.RangeLength = (It->RangeStart + It->RangeLength) - LastRange.RangeStart;
- TotalSlack += Slack;
}
else
{
@@ -4826,17 +5012,6 @@ namespace {
++It;
}
- uint64_t TotalFetch = 0;
- for (const BlockRangeDescriptor& Range : CollapsedBlockRanges)
- {
- TotalFetch += Range.RangeLength;
- }
-
- AllBlocksFetch += TotalFetch;
- AllBlocksSlack += TotalSlack;
- BlocksNeededCount++;
- AllBlockRequests += CollapsedBlockRanges.size();
-
TotalRequestCount += CollapsedBlockRanges.size();
TotalPartWriteCount += CollapsedBlockRanges.size();
@@ -4844,7 +5019,6 @@ namespace {
}
else
{
- BlocksNeededCount++;
TotalRequestCount++;
TotalPartWriteCount++;
@@ -4888,7 +5062,7 @@ namespace {
{
const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr;
uint64_t CacheFileOffset = (uint64_t)-1;
- uint64_t ChunkSize = (uint64_t)-1;
+ uint32_t ChunkIndex = (uint32_t)-1;
};
std::vector<WriteOp> WriteOps;
@@ -4905,7 +5079,7 @@ namespace {
{
WriteOps.push_back(WriteOp{.Target = Target,
.CacheFileOffset = ChunkTarget.CacheFileOffset,
- .ChunkSize = ChunkTarget.ChunkRawSize});
+ .ChunkIndex = ChunkTarget.RemoteChunkIndex});
}
TargetStart += ChunkTarget.TargetChunkLocationCount;
}
@@ -4931,8 +5105,10 @@ namespace {
{
ZEN_TRACE_CPU("Write");
- BufferedOpenFile SourceFile(LocalFilePath);
- WriteFileCache OpenFileCache;
+ tsl::robin_set<uint32_t> ChunkIndexesWritten;
+
+ BufferedOpenFile SourceFile(LocalFilePath, DiskStats);
+ WriteFileCache OpenFileCache(DiskStats);
for (const WriteOp& Op : WriteOps)
{
if (AbortFlag)
@@ -4944,7 +5120,7 @@ namespace {
RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]);
ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0);
const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex];
- const uint64_t ChunkSize = Op.ChunkSize;
+ const uint64_t ChunkSize = RemoteContent.ChunkedContent.ChunkRawSizes[Op.ChunkIndex];
CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ChunkSize);
ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]);
@@ -4959,7 +5135,13 @@ namespace {
ChunkSource,
Op.Target->Offset,
RemoteContent.RawSizes[RemotePathIndex]);
- WriteToDiskBytes += ChunkSize;
+
+ if (ChunkIndexesWritten.insert(Op.ChunkIndex).second)
+ {
+ WriteChunkStats.ChunkCountWritten++;
+ WriteChunkStats.ChunkBytesWritten += ChunkSize;
+ }
+
CacheLocalFileBytesRead += ChunkSize; // TODO: This should be the sum of unique chunk sizes?
}
}
@@ -4992,8 +5174,6 @@ namespace {
GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
}
}
-
- ChunkCountWritten += gsl::narrow<uint32_t>(CopyData.ChunkTargets.size());
ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]);
}
WritePartsComplete++;
@@ -5038,9 +5218,8 @@ namespace {
uint64_t RawSize;
if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, RawHash, RawSize))
{
- LooseChunksBytes += ExistingCompressedPart.GetSize();
- RequestsComplete++;
- if (RequestsComplete == TotalRequestCount)
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
@@ -5063,8 +5242,8 @@ namespace {
&RemoteLookup,
&CacheFolderPath,
&SequenceIndexChunksLeftToWriteCounters,
- &WriteToDiskBytes,
- &ChunkCountWritten,
+ &DiskStats,
+ &WriteChunkStats,
&WritePartsComplete,
&TotalPartWriteCount,
&FilteredWrittenBytesPerSecond,
@@ -5094,12 +5273,15 @@ namespace {
ChunkHash,
ChunkTargetPtrs,
std::move(CompressedPart),
- WriteToDiskBytes);
+ DiskStats,
+ WriteChunkStats);
+ WriteChunkStats.ChunkCountWritten++;
+ WriteChunkStats.ChunkBytesWritten +=
+ RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex];
+ WritePartsComplete++;
if (!AbortFlag)
{
- ChunkCountWritten++;
- WritePartsComplete++;
if (WritePartsComplete == TotalPartWriteCount)
{
FilteredWrittenBytesPerSecond.Stop();
@@ -5132,11 +5314,10 @@ namespace {
PreferredMultipartChunkSize,
Work,
NetworkPool,
- BytesDownloaded,
- MultipartAttachmentCount,
+ DownloadStats,
[&, RemoteChunkIndex, ChunkTargetPtrs](IoBuffer&& Payload) mutable {
- RequestsComplete++;
- if (RequestsComplete == TotalRequestCount)
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
@@ -5149,12 +5330,11 @@ namespace {
WritePool,
std::move(Payload),
SequenceIndexChunksLeftToWriteCounters,
- WriteToDiskBytes,
- ChunkCountWritten,
WritePartsComplete,
TotalPartWriteCount,
- LooseChunksBytes,
- FilteredWrittenBytesPerSecond);
+ FilteredWrittenBytesPerSecond,
+ DiskStats,
+ WriteChunkStats);
});
}
else
@@ -5167,10 +5347,10 @@ namespace {
throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
}
uint64_t BlobSize = BuildBlob.GetSize();
- BytesDownloaded += BlobSize;
-
- RequestsComplete++;
- if (RequestsComplete == TotalRequestCount)
+ DownloadStats.DownloadedChunkCount++;
+ DownloadStats.DownloadedChunkByteCount += BlobSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
@@ -5183,12 +5363,11 @@ namespace {
WritePool,
std::move(BuildBlob),
SequenceIndexChunksLeftToWriteCounters,
- WriteToDiskBytes,
- ChunkCountWritten,
WritePartsComplete,
TotalPartWriteCount,
- LooseChunksBytes,
- FilteredWrittenBytesPerSecond);
+ FilteredWrittenBytesPerSecond,
+ DiskStats,
+ WriteChunkStats);
}
}
}
@@ -5228,8 +5407,8 @@ namespace {
CompositeBuffer(std::move(BlockBuffer)),
RemoteLookup,
RemoteChunkIndexNeedsCopyFromSourceFlags,
- ChunkCountWritten,
- WriteToDiskBytes))
+ DiskStats,
+ WriteChunkStats))
{
std::error_code DummyEc;
std::filesystem::remove(BlockChunkPath, DummyEc);
@@ -5272,11 +5451,10 @@ namespace {
throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
}
uint64_t BlockSize = BlockBuffer.GetSize();
- BytesDownloaded += BlockSize;
- BlockBytes += BlockSize;
- DownloadedBlocks++;
- RequestsComplete++;
- if (RequestsComplete == TotalRequestCount)
+ DownloadStats.DownloadedBlockCount++;
+ DownloadStats.DownloadedBlockByteCount += BlockSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
@@ -5367,21 +5545,21 @@ namespace {
BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
RemoteLookup,
RemoteChunkIndexNeedsCopyFromSourceFlags,
- ChunkCountWritten,
- WriteToDiskBytes))
+ DiskStats,
+ WriteChunkStats))
{
std::error_code DummyEc;
std::filesystem::remove(BlockChunkPath, DummyEc);
throw std::runtime_error(
fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
}
- WritePartsComplete++;
if (!BlockChunkPath.empty())
{
std::filesystem::remove(BlockChunkPath);
}
+ WritePartsComplete++;
if (WritePartsComplete == TotalPartWriteCount)
{
FilteredWrittenBytesPerSecond.Stop();
@@ -5417,11 +5595,10 @@ namespace {
throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
}
uint64_t BlockSize = BlockBuffer.GetSize();
- BytesDownloaded += BlockSize;
- BlockBytes += BlockSize;
- DownloadedBlocks++;
- RequestsComplete++;
- if (RequestsComplete == TotalRequestCount)
+ DownloadStats.DownloadedBlockCount++;
+ DownloadStats.DownloadedBlockByteCount += BlockSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
@@ -5476,8 +5653,8 @@ namespace {
&SequenceIndexChunksLeftToWriteCounters,
BlockIndex,
&BlockDescriptions,
- &ChunkCountWritten,
- &WriteToDiskBytes,
+ &WriteChunkStats,
+ &DiskStats,
&WritePartsComplete,
&TotalPartWriteCount,
&FilteredWrittenBytesPerSecond,
@@ -5513,20 +5690,21 @@ namespace {
CompositeBuffer(std::move(BlockBuffer)),
RemoteLookup,
RemoteChunkIndexNeedsCopyFromSourceFlags,
- ChunkCountWritten,
- WriteToDiskBytes))
+ DiskStats,
+ WriteChunkStats))
{
std::error_code DummyEc;
std::filesystem::remove(BlockChunkPath, DummyEc);
throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
}
- WritePartsComplete++;
if (!BlockChunkPath.empty())
{
std::filesystem::remove(BlockChunkPath);
}
+ WritePartsComplete++;
+
if (WritePartsComplete == TotalPartWriteCount)
{
FilteredWrittenBytesPerSecond.Stop();
@@ -5540,35 +5718,32 @@ namespace {
Work.DefaultErrorFunction());
}
- ZEN_DEBUG("Fetching {} with {} slack (ideal {}) out of {} using {} requests for {} blocks",
- NiceBytes(AllBlocksFetch),
- NiceBytes(AllBlocksSlack),
- NiceBytes(AllBlockChunksSize),
- NiceBytes(AllBlocksSize),
- AllBlockRequests,
- BlocksNeededCount);
{
ZEN_TRACE_CPU("WriteChunks_Wait");
Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(IsAborted, PendingWork);
- ZEN_ASSERT(ChunkCountToWrite >= ChunkCountWritten.load());
- FilteredWrittenBytesPerSecond.Update(WriteToDiskBytes.load());
- FilteredDownloadedBytesPerSecond.Update(BytesDownloaded.load());
+ ZEN_ASSERT(ChunkCountToWrite >= WriteChunkStats.ChunkCountWritten.load());
+ uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() +
+ DownloadStats.DownloadedBlockByteCount.load() +
+ +DownloadStats.DownloadedPartialBlockByteCount.load();
+ FilteredWrittenBytesPerSecond.Update(DiskStats.WriteByteCount.load());
+ FilteredDownloadedBytesPerSecond.Update(DownloadedBytes);
std::string Details = fmt::format("{}/{} ({} {}bits/s) downloaded. {}/{} ({} {}B/s) written.",
- RequestsComplete.load(),
+ DownloadStats.RequestsCompleteCount.load(),
TotalRequestCount,
- NiceBytes(BytesDownloaded.load()),
+ NiceBytes(DownloadedBytes),
NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8),
- ChunkCountWritten.load(),
+ WriteChunkStats.ChunkCountWritten.load(),
ChunkCountToWrite,
- NiceBytes(WriteToDiskBytes.load()),
+ NiceBytes(DiskStats.WriteByteCount.load()),
NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()));
- WriteProgressBar.UpdateState({.Task = "Writing chunks ",
- .Details = Details,
- .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite),
- .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - ChunkCountWritten.load())},
- false);
+ WriteProgressBar.UpdateState(
+ {.Task = "Writing chunks ",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite),
+ .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - WriteChunkStats.ChunkCountWritten.load())},
+ false);
});
}
@@ -5598,25 +5773,21 @@ namespace {
}
ZEN_ASSERT(RawSequencesMissingWriteCount == 0);
+ const uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() + DownloadStats.DownloadedBlockByteCount.load() +
+ +DownloadStats.DownloadedPartialBlockByteCount.load();
ZEN_CONSOLE("Downloaded {} ({}bits/s) in {}. Wrote {} ({}B/s) in {}. Completed in {}",
- NiceBytes(BytesDownloaded.load()),
- NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), BytesDownloaded * 8)),
+ NiceBytes(DownloadedBytes),
+ NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), DownloadedBytes * 8)),
NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000),
- NiceBytes(WriteToDiskBytes.load()),
- NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), WriteToDiskBytes.load())),
+ NiceBytes(DiskStats.WriteByteCount.load()),
+ NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), DiskStats.WriteByteCount.load())),
NiceTimeSpanMs(FilteredWrittenBytesPerSecond.GetElapsedTimeUS() / 1000),
NiceTimeSpanMs(WriteTimer.GetElapsedTimeMs()));
- }
- std::vector<std::pair<IoHash, uint32_t>> Targets;
- Targets.reserve(RemoteContent.Paths.size());
- for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++)
- {
- Targets.push_back(std::make_pair(RemoteContent.RawHashes[RemotePathIndex], RemotePathIndex));
+ WriteChunkStats.WriteChunksElapsedWallTimeUs = WriteTimer.GetElapsedTimeUs();
+ WriteChunkStats.DownloadTimeUs = FilteredDownloadedBytesPerSecond.GetElapsedTimeUS();
+ WriteChunkStats.WriteTimeUs = FilteredWrittenBytesPerSecond.GetElapsedTimeUS();
}
- std::sort(Targets.begin(), Targets.end(), [](const std::pair<IoHash, uint32_t>& Lhs, const std::pair<IoHash, uint32_t>& Rhs) {
- return Lhs.first < Rhs.first;
- });
// Move all files we will reuse to cache folder
// TODO: If WipeTargetFolder is false we could check which files are already correct and leave them in place
@@ -5643,6 +5814,7 @@ namespace {
if (WipeTargetFolder)
{
ZEN_TRACE_CPU("UpdateFolder_WipeTarget");
+ Stopwatch Timer;
// Clean target folder
ZEN_CONSOLE("Wiping {}", Path);
@@ -5650,10 +5822,12 @@ namespace {
{
ZEN_WARN("Some files in {} could not be removed", Path);
}
+ RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs();
}
else
{
ZEN_TRACE_CPU("UpdateFolder_RemoveUnused");
+ Stopwatch Timer;
// Remove unused tracked files
tsl::robin_map<std::string, uint32_t> RemotePathToRemoteIndex;
@@ -5683,10 +5857,12 @@ namespace {
std::filesystem::remove(LocalFilePath);
}
}
+ RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs();
}
{
ZEN_TRACE_CPU("UpdateFolder_FinalizeTree");
+ Stopwatch Timer;
WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
@@ -5700,6 +5876,16 @@ namespace {
std::atomic<uint64_t> TargetsComplete = 0;
+ std::vector<std::pair<IoHash, uint32_t>> Targets;
+ Targets.reserve(RemoteContent.Paths.size());
+ for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++)
+ {
+ Targets.push_back(std::make_pair(RemoteContent.RawHashes[RemotePathIndex], RemotePathIndex));
+ }
+ std::sort(Targets.begin(), Targets.end(), [](const std::pair<IoHash, uint32_t>& Lhs, const std::pair<IoHash, uint32_t>& Rhs) {
+ return Lhs.first < Rhs.first;
+ });
+
size_t TargetOffset = 0;
while (TargetOffset < Targets.size())
{
@@ -5753,6 +5939,7 @@ namespace {
SetFileReadOnly(FirstTargetFilePath, false);
}
std::filesystem::rename(CacheFilePath, FirstTargetFilePath);
+ RebuildFolderStateStats.FinalizeTreeFilesMovedCount++;
}
OutLocalFolderState.Attributes[FirstTargetPathIndex] =
@@ -5781,6 +5968,7 @@ namespace {
SetFileReadOnly(ExtraTargetFilePath, false);
}
CopyFile(FirstTargetFilePath, ExtraTargetFilePath, {.EnableClone = false});
+ RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++;
OutLocalFolderState.Attributes[ExtraTargetPathIndex] =
RemoteContent.Attributes.empty()
@@ -5815,6 +6003,8 @@ namespace {
});
}
+ RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs = Timer.GetElapsedTimeUs();
+
if (AbortFlag)
{
return;
@@ -6474,13 +6664,21 @@ namespace {
}
else
{
- ExtendableStringBuilder<128> SB;
+ ExtendableStringBuilder<128> BuildPartString;
for (const std::pair<Oid, std::string>& BuildPart : AllBuildParts)
{
- SB.Append(fmt::format(" {} ({})", BuildPart.second, BuildPart.first));
+ BuildPartString.Append(fmt::format(" {} ({})", BuildPart.second, BuildPart.first));
}
- ZEN_CONSOLE("Downloading build {}, parts:{}", BuildId, SB.ToView());
+ ZEN_CONSOLE("Downloading build {}, parts:{}", BuildId, BuildPartString.ToView());
FolderContent LocalFolderState;
+
+ DiskStatistics DiskStats;
+ CacheMappingStatistics CacheMappingStats;
+ DownloadStatistics DownloadStats;
+ WriteChunkStatistics WriteChunkStats;
+ RebuildFolderStateStatistics RebuildFolderStateStats;
+ VerifyFolderStatistics VerifyFolderStats;
+
UpdateFolder(Storage,
BuildId,
Path,
@@ -6492,11 +6690,16 @@ namespace {
LooseChunkHashes,
AllowPartialBlockRequests,
WipeTargetFolder,
- LocalFolderState);
+ LocalFolderState,
+ DiskStats,
+ CacheMappingStats,
+ DownloadStats,
+ WriteChunkStats,
+ RebuildFolderStateStats);
if (!AbortFlag)
{
- VerifyFolder(RemoteContent, Path, PostDownloadVerify);
+ VerifyFolder(RemoteContent, Path, PostDownloadVerify, VerifyFolderStats);
Stopwatch WriteStateTimer;
CbObject StateObject = CreateStateObject(BuildId, AllBuildParts, PartContents, LocalFolderState);
@@ -6510,8 +6713,37 @@ namespace {
CompactBinaryToJson(StateObject, SB);
WriteFile(Path / ZenStateFileJsonPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size()));
#endif // 0
+ const uint64_t DownloadCount = DownloadStats.DownloadedChunkCount.load() + DownloadStats.DownloadedBlockCount.load() +
+ DownloadStats.DownloadedPartialBlockCount.load();
+ const uint64_t DownloadByteCount = DownloadStats.DownloadedChunkByteCount.load() +
+ DownloadStats.DownloadedBlockByteCount.load() +
+ DownloadStats.DownloadedPartialBlockByteCount.load();
+ const uint64_t DownloadTimeMs = DownloadTimer.GetElapsedTimeMs();
+
+ ZEN_CONSOLE(
+ "Downloaded build {}, parts:{} in {}\n"
+ " Download: {} ({}) {}bits/s\n"
+ " Write: {} ({}) {}B/s\n"
+ " Clean: {}\n"
+ " Finalize: {}\n"
+ " Verify: {}",
+ BuildId,
+ BuildPartString.ToView(),
+ NiceTimeSpanMs(DownloadTimeMs),
+
+ DownloadCount,
+ NiceBytes(DownloadByteCount),
+ NiceNum(GetBytesPerSecond(WriteChunkStats.DownloadTimeUs, DownloadByteCount * 8)),
- ZEN_CONSOLE("Downloaded build in {}.", NiceTimeSpanMs(DownloadTimer.GetElapsedTimeMs()));
+ DiskStats.WriteCount.load(),
+ NiceBytes(DiskStats.WriteByteCount.load()),
+ NiceNum(GetBytesPerSecond(WriteChunkStats.WriteTimeUs, DiskStats.WriteByteCount.load())),
+
+ NiceTimeSpanMs(RebuildFolderStateStats.CleanFolderElapsedWallTimeUs / 1000),
+
+ NiceTimeSpanMs(RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs / 1000),
+
+ NiceTimeSpanMs(VerifyFolderStats.VerifyElapsedWallTimeUs / 1000));
}
}
if (CleanDirectory(ZenTempFolder, {}))
@@ -7504,6 +7736,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
}
+ Stopwatch Timer;
for (const std::string& BuildIdString : m_BuildIds)
{
Oid BuildId = Oid::FromHexString(BuildIdString);
@@ -7527,6 +7760,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\n");
}
+ ZEN_CONSOLE("Completed in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
return 0;
}
@@ -7699,27 +7933,29 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
case 0:
{
uint64_t SourceSize = *FileSizeIt;
- if (SourceSize > 0)
+ if (SourceSize > 256)
{
Work.ScheduleWork(
GetMediumWorkerPool(EWorkloadType::Burst),
[SourceSize, FilePath](std::atomic<bool>&) {
if (!AbortFlag)
{
- IoBuffer Scrambled(SourceSize);
+ bool IsReadOnly = SetFileReadOnly(FilePath, false);
{
- IoBuffer Source = IoBufferBuilder::MakeFromFile(FilePath);
- Scrambled.GetMutableView().CopyFrom(
- Source.GetView().Mid(SourceSize / 3, SourceSize / 3));
- Scrambled.GetMutableView()
- .Mid(SourceSize / 3)
- .CopyFrom(Source.GetView().Mid(0, SourceSize / 3));
- Scrambled.GetMutableView()
- .Mid((SourceSize / 3) * 2)
- .CopyFrom(Source.GetView().Mid(SourceSize / 2, SourceSize / 3));
+ BasicFile Source(FilePath, BasicFile::Mode::kWrite);
+ uint64_t RangeSize = Min(SourceSize / 3, 512u * 1024u);
+ IoBuffer TempBuffer1(RangeSize);
+ IoBuffer TempBuffer2(RangeSize);
+ IoBuffer TempBuffer3(RangeSize);
+ Source.Read(TempBuffer1.GetMutableView().GetData(), RangeSize, 0);
+ Source.Read(TempBuffer2.GetMutableView().GetData(), RangeSize, SourceSize / 2);
+ Source.Read(TempBuffer3.GetMutableView().GetData(),
+ RangeSize,
+ SourceSize - RangeSize);
+ Source.Write(TempBuffer1, SourceSize / 2);
+ Source.Write(TempBuffer2, SourceSize - RangeSize);
+ Source.Write(TempBuffer3, SourceSize - 0);
}
- bool IsReadOnly = SetFileReadOnly(FilePath, false);
- WriteFile(FilePath, Scrambled);
if (IsReadOnly)
{
SetFileReadOnly(FilePath, true);
@@ -7957,7 +8193,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
Oid BuildPartId = Oid::TryFromHexString(m_BuildPartId);
- ValidateBuildPart(*Storage, BuildId, BuildPartId, m_BuildPartName);
+ ValidateStatistics ValidateStats;
+ DownloadStatistics DownloadStats;
+ ValidateBuildPart(*Storage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, DownloadStats);
return AbortFlag ? 13 : 0;
}
diff --git a/src/zencore/compress.cpp b/src/zencore/compress.cpp
index 88c3bb5b9..ad6b6103c 100644
--- a/src/zencore/compress.cpp
+++ b/src/zencore/compress.cpp
@@ -158,9 +158,10 @@ class BaseEncoder
{
public:
[[nodiscard]] virtual CompositeBuffer Compress(const CompositeBuffer& RawData, uint64_t BlockSize = DefaultBlockSize) const = 0;
- [[nodiscard]] virtual bool CompressToStream(const CompositeBuffer& RawData,
- std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
- uint64_t BlockSize = DefaultBlockSize) const = 0;
+ [[nodiscard]] virtual bool CompressToStream(
+ const CompositeBuffer& RawData,
+ std::function<void(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
+ uint64_t BlockSize = DefaultBlockSize) const = 0;
};
class BaseDecoder
@@ -189,11 +190,13 @@ public:
uint64_t RawOffset,
uint64_t RawSize) const = 0;
- virtual bool DecompressToStream(const BufferHeader& Header,
- const CompositeBuffer& CompressedData,
- uint64_t RawOffset,
- uint64_t RawSize,
- std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const = 0;
+ virtual bool DecompressToStream(
+ const BufferHeader& Header,
+ const CompositeBuffer& CompressedData,
+ uint64_t RawOffset,
+ uint64_t RawSize,
+ std::function<bool(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback)
+ const = 0;
};
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -207,13 +210,14 @@ public:
return CompositeBuffer(HeaderData.MoveToShared(), RawData.MakeOwned());
}
- [[nodiscard]] virtual bool CompressToStream(const CompositeBuffer& RawData,
- std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
- uint64_t /* BlockSize */) const final
+ [[nodiscard]] virtual bool CompressToStream(
+ const CompositeBuffer& RawData,
+ std::function<void(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
+ uint64_t /* BlockSize */) const final
{
UniqueBuffer HeaderData = CompressedBuffer::CreateHeaderForNoneEncoder(RawData.GetSize(), BLAKE3::HashBuffer(RawData));
- Callback(0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderData.GetData(), HeaderData.GetSize())));
- Callback(HeaderData.GetSize(), RawData);
+ Callback(0, 0, 0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderData.GetData(), HeaderData.GetSize())));
+ Callback(0, RawData.GetSize(), HeaderData.GetSize(), RawData);
return true;
}
};
@@ -283,17 +287,19 @@ public:
[[nodiscard]] uint64_t GetHeaderSize(const BufferHeader&) const final { return sizeof(BufferHeader); }
- virtual bool DecompressToStream(const BufferHeader& Header,
- const CompositeBuffer& CompressedData,
- uint64_t RawOffset,
- uint64_t RawSize,
- std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const final
+ virtual bool DecompressToStream(
+ const BufferHeader& Header,
+ const CompositeBuffer& CompressedData,
+ uint64_t RawOffset,
+ uint64_t RawSize,
+ std::function<bool(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback)
+ const final
{
if (Header.Method == CompressionMethod::None && Header.TotalCompressedSize == CompressedData.GetSize() &&
Header.TotalCompressedSize == Header.TotalRawSize + sizeof(BufferHeader) && RawOffset < Header.TotalRawSize &&
(RawOffset + RawSize) <= Header.TotalRawSize)
{
- if (!Callback(0, CompressedData.Mid(sizeof(BufferHeader) + RawOffset, RawSize)))
+ if (!Callback(sizeof(BufferHeader) + RawOffset, RawSize, 0, CompressedData.Mid(sizeof(BufferHeader) + RawOffset, RawSize)))
{
return false;
}
@@ -309,9 +315,10 @@ class BlockEncoder : public BaseEncoder
{
public:
virtual CompositeBuffer Compress(const CompositeBuffer& RawData, uint64_t BlockSize) const final;
- virtual bool CompressToStream(const CompositeBuffer& RawData,
- std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
- uint64_t BlockSize) const final;
+ virtual bool CompressToStream(
+ const CompositeBuffer& RawData,
+ std::function<void(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
+ uint64_t BlockSize) const final;
protected:
virtual CompressionMethod GetMethod() const = 0;
@@ -460,9 +467,10 @@ BlockEncoder::Compress(const CompositeBuffer& RawData, const uint64_t BlockSize)
}
bool
-BlockEncoder::CompressToStream(const CompositeBuffer& RawData,
- std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
- uint64_t BlockSize = DefaultBlockSize) const
+BlockEncoder::CompressToStream(
+ const CompositeBuffer& RawData,
+ std::function<void(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
+ uint64_t BlockSize = DefaultBlockSize) const
{
ZEN_ASSERT(IsPow2(BlockSize) && (BlockSize <= (1u << 31)));
@@ -504,13 +512,17 @@ BlockEncoder::CompressToStream(const CompositeBuffer& RawData,
uint64_t CompressedBlockSize = CompressedBlock.GetSize();
if (RawBlockSize <= CompressedBlockSize)
{
- Callback(FullHeaderSize + CompressedSize,
+ Callback(FileRef.FileChunkOffset + RawOffset,
+ RawBlockSize,
+ FullHeaderSize + CompressedSize,
CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawBlockCopy.GetView().GetData(), RawBlockSize)));
CompressedBlockSize = RawBlockSize;
}
else
{
- Callback(FullHeaderSize + CompressedSize,
+ Callback(FileRef.FileChunkOffset + RawOffset,
+ RawBlockSize,
+ FullHeaderSize + CompressedSize,
CompositeBuffer(IoBuffer(IoBuffer::Wrap, CompressedBlock.GetData(), CompressedBlockSize)));
}
@@ -540,12 +552,17 @@ BlockEncoder::CompressToStream(const CompositeBuffer& RawData,
uint64_t CompressedBlockSize = CompressedBlock.GetSize();
if (RawBlockSize <= CompressedBlockSize)
{
- Callback(FullHeaderSize + CompressedSize, CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawBlock.GetData(), RawBlockSize)));
+ Callback(RawOffset,
+ RawBlockSize,
+ FullHeaderSize + CompressedSize,
+ CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawBlock.GetData(), RawBlockSize)));
CompressedBlockSize = RawBlockSize;
}
else
{
- Callback(FullHeaderSize + CompressedSize,
+ Callback(RawOffset,
+ RawBlockSize,
+ FullHeaderSize + CompressedSize,
CompositeBuffer(IoBuffer(IoBuffer::Wrap, CompressedBlock.GetData(), CompressedBlockSize)));
}
@@ -582,7 +599,7 @@ BlockEncoder::CompressToStream(const CompositeBuffer& RawData,
HeaderBuffer.GetMutableView().Mid(sizeof(BufferHeader), MetaSize).CopyFrom(MakeMemoryView(CompressedBlockSizes));
Header.Write(HeaderBuffer.GetMutableView());
- Callback(0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderBuffer.GetData(), HeaderBuffer.GetSize())));
+ Callback(0, 0, 0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderBuffer.GetData(), HeaderBuffer.GetSize())));
return true;
}
@@ -615,11 +632,13 @@ public:
MutableMemoryView RawView,
uint64_t RawOffset) const final;
- virtual bool DecompressToStream(const BufferHeader& Header,
- const CompositeBuffer& CompressedData,
- uint64_t RawOffset,
- uint64_t RawSize,
- std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const final;
+ virtual bool DecompressToStream(
+ const BufferHeader& Header,
+ const CompositeBuffer& CompressedData,
+ uint64_t RawOffset,
+ uint64_t RawSize,
+ std::function<bool(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback)
+ const final;
protected:
virtual bool DecompressBlock(MutableMemoryView RawData, MemoryView CompressedData) const = 0;
@@ -743,11 +762,12 @@ BlockDecoder::DecompressToComposite(const BufferHeader& Header, const CompositeB
}
bool
-BlockDecoder::DecompressToStream(const BufferHeader& Header,
- const CompositeBuffer& CompressedData,
- uint64_t RawOffset,
- uint64_t RawSize,
- std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const
+BlockDecoder::DecompressToStream(
+ const BufferHeader& Header,
+ const CompositeBuffer& CompressedData,
+ uint64_t RawOffset,
+ uint64_t RawSize,
+ std::function<bool(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const
{
if (Header.TotalCompressedSize != CompressedData.GetSize())
{
@@ -817,7 +837,9 @@ BlockDecoder::DecompressToStream(const BufferHeader& Header,
Source.Detach();
return false;
}
- if (!Callback(BlockIndex * BlockSize + OffsetInFirstBlock,
+ if (!Callback(FileRef.FileChunkOffset + CompressedOffset,
+ CompressedBlockSize,
+ BlockIndex * BlockSize + OffsetInFirstBlock,
CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawDataBuffer.GetData(), BytesToUncompress))))
{
Source.Detach();
@@ -827,6 +849,8 @@ BlockDecoder::DecompressToStream(const BufferHeader& Header,
else
{
if (!Callback(
+ FileRef.FileChunkOffset + CompressedOffset,
+ BytesToUncompress,
BlockIndex * BlockSize + OffsetInFirstBlock,
CompositeBuffer(
IoBuffer(IoBuffer::Wrap, CompressedBlockCopy.GetView().Mid(OffsetInFirstBlock).GetData(), BytesToUncompress))))
@@ -870,7 +894,9 @@ BlockDecoder::DecompressToStream(const BufferHeader& Header,
{
return false;
}
- if (!Callback(BlockIndex * BlockSize + OffsetInFirstBlock,
+ if (!Callback(CompressedOffset,
+ UncompressedBlockSize,
+ BlockIndex * BlockSize + OffsetInFirstBlock,
CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawDataBuffer.GetData(), BytesToUncompress))))
{
return false;
@@ -879,6 +905,8 @@ BlockDecoder::DecompressToStream(const BufferHeader& Header,
else
{
if (!Callback(
+ CompressedOffset,
+ BytesToUncompress,
BlockIndex * BlockSize + OffsetInFirstBlock,
CompositeBuffer(
IoBuffer(IoBuffer::Wrap, CompressedBlockCopy.GetView().Mid(OffsetInFirstBlock).GetData(), BytesToUncompress))))
@@ -1778,11 +1806,12 @@ CompressedBuffer::Compress(const SharedBuffer& RawData,
}
bool
-CompressedBuffer::CompressToStream(const CompositeBuffer& RawData,
- std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
- OodleCompressor Compressor,
- OodleCompressionLevel CompressionLevel,
- uint64_t BlockSize)
+CompressedBuffer::CompressToStream(
+ const CompositeBuffer& RawData,
+ std::function<void(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
+ OodleCompressor Compressor,
+ OodleCompressionLevel CompressionLevel,
+ uint64_t BlockSize)
{
using namespace detail;
@@ -1995,9 +2024,10 @@ CompressedBuffer::DecompressToComposite() const
}
bool
-CompressedBuffer::DecompressToStream(uint64_t RawOffset,
- uint64_t RawSize,
- std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const
+CompressedBuffer::DecompressToStream(
+ uint64_t RawOffset,
+ uint64_t RawSize,
+ std::function<bool(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const
{
using namespace detail;
if (CompressedData)
diff --git a/src/zencore/include/zencore/compress.h b/src/zencore/include/zencore/compress.h
index 74fd5f767..09fa6249d 100644
--- a/src/zencore/include/zencore/compress.h
+++ b/src/zencore/include/zencore/compress.h
@@ -74,11 +74,12 @@ public:
OodleCompressor Compressor = OodleCompressor::Mermaid,
OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast,
uint64_t BlockSize = 0);
- [[nodiscard]] ZENCORE_API static bool CompressToStream(const CompositeBuffer& RawData,
- std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
- OodleCompressor Compressor = OodleCompressor::Mermaid,
- OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast,
- uint64_t BlockSize = 0);
+ [[nodiscard]] ZENCORE_API static bool CompressToStream(
+ const CompositeBuffer& RawData,
+ std::function<void(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
+ OodleCompressor Compressor = OodleCompressor::Mermaid,
+ OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast,
+ uint64_t BlockSize = 0);
/**
* Construct from a compressed buffer previously created by Compress().
@@ -207,9 +208,10 @@ public:
*
* @return True if the buffer is valid and can be decompressed.
*/
- [[nodiscard]] ZENCORE_API bool DecompressToStream(uint64_t RawOffset,
- uint64_t RawSize,
- std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const;
+ [[nodiscard]] ZENCORE_API bool DecompressToStream(
+ uint64_t RawOffset,
+ uint64_t RawSize,
+ std::function<bool(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const;
/** A null compressed buffer. */
static const CompressedBuffer Null;