aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-03-10 18:33:24 +0100
committerGitHub Enterprise <[email protected]>2025-03-10 18:33:24 +0100
commit7de3d4218ee5969af6147f9ab20bda538a136d9a (patch)
tree77e10f8e3a165275bbb0bfa516eb65a854cd75ec /src
parentpartial block fetch (#298) (diff)
downloadzen-7de3d4218ee5969af6147f9ab20bda538a136d9a.tar.xz
zen-7de3d4218ee5969af6147f9ab20bda538a136d9a.zip
pick up existing cache (#299)
- Improvement: Scavenge .zen temp folders for existing data (downloaded, decompressed or written) from previous failed run - Improvement: Faster abort during stream compression - Improvement: Try to move downloaded blobs with rename if possible avoiding an extra disk write - Improvement: Only clean temp folders on successful or cancelled build - keep it if download fails
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.cpp2376
-rw-r--r--src/zen/cmds/builds_cmd.h8
-rw-r--r--src/zencore/basicfile.cpp37
-rw-r--r--src/zencore/compress.cpp51
-rw-r--r--src/zencore/include/zencore/basicfile.h2
-rw-r--r--src/zencore/include/zencore/compress.h2
-rw-r--r--src/zencore/workthreadpool.cpp2
-rw-r--r--src/zenutil/chunkblock.cpp51
-rw-r--r--src/zenutil/include/zenutil/chunkblock.h9
9 files changed, 1651 insertions, 887 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index 1c9476b96..2c7a73fb3 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -83,15 +83,24 @@ namespace {
const double DefaultLatency = 0; // .0010;
const double DefaultDelayPerKBSec = 0; // 0.00005;
- const std::string ZenFolderName = ".zen";
- const std::string ZenStateFilePath = fmt::format("{}/current_state.cbo", ZenFolderName);
- const std::string ZenStateFileJsonPath = fmt::format("{}/current_state.json", ZenFolderName);
- const std::string ZenTempFolderName = fmt::format("{}/tmp", ZenFolderName);
- const std::string ZenTempCacheFolderName = fmt::format("{}/cache", ZenTempFolderName);
- const std::string ZenTempStorageFolderName = fmt::format("{}/storage", ZenTempFolderName);
- const std::string ZenTempBlockFolderName = fmt::format("{}/blocks", ZenTempFolderName);
- const std::string ZenTempChunkFolderName = fmt::format("{}/chunks", ZenTempFolderName);
- const std::string ZenExcludeManifestName = ".zen_exclude_manifest.txt";
+ const std::string ZenFolderName = ".zen";
+ const std::string ZenStateFilePath = fmt::format("{}/current_state.cbo", ZenFolderName);
+ const std::string ZenStateFileJsonPath = fmt::format("{}/current_state.json", ZenFolderName);
+ const std::string ZenTempFolderName = fmt::format("{}/tmp", ZenFolderName);
+
+ const std::string ZenTempCacheFolderName =
+ fmt::format("{}/cache", ZenTempFolderName); // Decompressed and verified data - chunks & sequences
+ const std::string ZenTempBlockFolderName = fmt::format("{}/blocks", ZenTempFolderName); // Temp storage for whole and partial blocks
+ const std::string ZenTempChunkFolderName =
+ fmt::format("{}/chunks", ZenTempFolderName); // Temp storage for decompressed and validated chunks
+
+ const std::string ZenTempDownloadFolderName =
+ fmt::format("{}/download", ZenTempFolderName); // Temp storage for unverfied downloaded blobs
+
+ const std::string ZenTempStorageFolderName =
+ fmt::format("{}/storage", ZenTempFolderName); // Temp storage folder for BuildStorage implementations
+
+ const std::string ZenExcludeManifestName = ".zen_exclude_manifest.txt";
const std::string UnsyncFolderName = ".unsync";
@@ -231,21 +240,31 @@ namespace {
return AuthToken;
}
- CompositeBuffer WriteToTempFileIfNeeded(const CompositeBuffer& Buffer,
- const std::filesystem::path& TempFolderPath,
- const IoHash& Hash,
- const std::string& Suffix = {})
+ bool IsBufferDiskBased(const IoBuffer& Buffer)
{
- // If this is a file based buffer or a compressed buffer with a memory-based header, we don't need to rewrite to disk to save memory
- std::span<const SharedBuffer> Segments = Buffer.GetSegments();
- ZEN_ASSERT(Buffer.GetSegments().size() > 0);
IoBufferFileReference FileRef;
- if (Segments.back().GetFileReference(FileRef))
+ if (Buffer.GetFileReference(FileRef))
{
- return Buffer;
+ return true;
}
+ return false;
+ }
+
+ bool IsBufferDiskBased(const CompositeBuffer& Buffer)
+ {
+ // If this is a file based buffer or a compressed buffer with a memory-based header, we don't need to rewrite to disk to save memory
+ std::span<const SharedBuffer> Segments = Buffer.GetSegments();
+ ZEN_ASSERT(Buffer.GetSegments().size() > 0);
+ return IsBufferDiskBased(Segments.back().AsIoBuffer());
+ }
+
+ IoBuffer WriteToTempFile(CompositeBuffer&& Buffer,
+ const std::filesystem::path& TempFolderPath,
+ const IoHash& Hash,
+ const std::string& Suffix = {})
+ {
std::filesystem::path TempFilePath = (TempFolderPath / (Hash.ToHexString() + Suffix)).make_preferred();
- return CompositeBuffer(WriteToTempFile(Buffer, TempFilePath));
+ return WriteToTempFile(std::move(Buffer), TempFilePath);
}
class FilteredRate
@@ -1109,12 +1128,22 @@ namespace {
IoHashStream Hash;
bool CouldDecompress = Compressed.DecompressToStream(0, RawSize, [&Hash](uint64_t, const CompositeBuffer& RangeBuffer) {
- for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ if (!AbortFlag)
{
- Hash.Append(Segment.GetView());
+ for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ {
+ Hash.Append(Segment.GetView());
+ }
+ return true;
}
+ return false;
});
+ if (AbortFlag)
+ {
+ return CompositeBuffer{};
+ }
+
if (!CouldDecompress)
{
throw std::runtime_error(
@@ -1299,14 +1328,14 @@ namespace {
{
Work.ScheduleWork(
VerifyPool,
- [&, Payload = std::move(Payload), ChunkAttachment](std::atomic<bool>&) {
+ [&, Payload = std::move(Payload), ChunkAttachment](std::atomic<bool>&) mutable {
if (!AbortFlag)
{
FilteredVerifiedBytesPerSecond.Start();
uint64_t CompressedSize;
uint64_t DecompressedSize;
- ValidateBlob(IoBuffer(Payload), ChunkAttachment, CompressedSize, DecompressedSize);
+ ValidateBlob(std::move(Payload), ChunkAttachment, CompressedSize, DecompressedSize);
ZEN_CONSOLE_VERBOSE("Chunk attachment {} ({} -> {}) is valid",
ChunkAttachment,
NiceBytes(CompressedSize),
@@ -1349,14 +1378,14 @@ namespace {
{
Work.ScheduleWork(
VerifyPool,
- [&, Payload = std::move(Payload), BlockAttachment](std::atomic<bool>&) {
+ [&, Payload = std::move(Payload), BlockAttachment](std::atomic<bool>&) mutable {
if (!AbortFlag)
{
FilteredVerifiedBytesPerSecond.Start();
uint64_t CompressedSize;
uint64_t DecompressedSize;
- ValidateChunkBlock(IoBuffer(Payload), BlockAttachment, CompressedSize, DecompressedSize);
+ ValidateChunkBlock(std::move(Payload), BlockAttachment, CompressedSize, DecompressedSize);
ZEN_CONSOLE_VERBOSE("Chunk block {} ({} -> {}) is valid",
BlockAttachment,
NiceBytes(CompressedSize),
@@ -1564,8 +1593,12 @@ namespace {
{
throw std::runtime_error(fmt::format("Failed to compress large blob {}", ChunkHash));
}
- CompositeBuffer TempPayload = WriteToTempFileIfNeeded(CompressedBlob.GetCompressed(), TempFolderPath, ChunkHash);
- return CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload)).GetCompressed();
+ if (!IsBufferDiskBased(CompressedBlob.GetCompressed()))
+ {
+ IoBuffer TempPayload = WriteToTempFile(std::move(CompressedBlob).GetCompressed(), TempFolderPath, ChunkHash);
+ CompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload));
+ }
+ return std::move(CompressedBlob).GetCompressed();
}
struct GeneratedBlocks
@@ -1637,9 +1670,13 @@ namespace {
OutBlocks.BlockSizes[BlockIndex] = CompressedBlock.GetCompressedSize();
- CompositeBuffer Payload = WriteToTempFileIfNeeded(CompressedBlock.GetCompressed(),
- Path / ZenTempBlockFolderName,
- OutBlocks.BlockDescriptions[BlockIndex].BlockHash);
+ if (!IsBufferDiskBased(CompressedBlock.GetCompressed()))
+ {
+ IoBuffer TempPayload = WriteToTempFile(std::move(CompressedBlock).GetCompressed(),
+ Path / ZenTempBlockFolderName,
+ OutBlocks.BlockDescriptions[BlockIndex].BlockHash);
+ CompressedBlock = CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload));
+ }
{
CbObjectWriter Writer;
Writer.AddString("createdBy", "zen");
@@ -1663,7 +1700,7 @@ namespace {
PendingUploadCount++;
Work.ScheduleWork(
UploadBlocksPool,
- [&, BlockIndex, Payload = std::move(Payload)](std::atomic<bool>&) {
+ [&, BlockIndex, Payload = std::move(CompressedBlock).GetCompressed()](std::atomic<bool>&) mutable {
if (!AbortFlag)
{
if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount)
@@ -1682,12 +1719,16 @@ namespace {
BuildChunkBlockDescription(OutBlocks.BlockDescriptions[BlockIndex],
OutBlocks.BlockMetaDatas[BlockIndex]);
- const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash;
- Storage.PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload);
- UploadStats.BlocksBytes += Payload.GetSize();
+ const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash;
+ const uint64_t CompressedBlockSize = Payload.GetSize();
+ Storage.PutBuildBlob(BuildId,
+ BlockHash,
+ ZenContentType::kCompressedBinary,
+ std::move(Payload));
+ UploadStats.BlocksBytes += CompressedBlockSize;
ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks",
OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
- NiceBytes(Payload.GetSize()),
+ NiceBytes(CompressedBlockSize),
OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
Storage.PutBlockMetadata(BuildId,
@@ -1817,7 +1858,7 @@ namespace {
auto AsyncUploadBlock = [&](const size_t BlockIndex, const IoHash BlockHash, CompositeBuffer&& Payload) {
Work.ScheduleWork(
UploadChunkPool,
- [&, BlockIndex, BlockHash, Payload = std::move(Payload)](std::atomic<bool>&) {
+ [&, BlockIndex, BlockHash, Payload = std::move(Payload)](std::atomic<bool>&) mutable {
if (!AbortFlag)
{
FilteredUploadedBytesPerSecond.Start();
@@ -1855,7 +1896,7 @@ namespace {
auto AsyncUploadLooseChunk = [&](const IoHash& RawHash, uint64_t RawSize, CompositeBuffer&& Payload) {
Work.ScheduleWork(
UploadChunkPool,
- [&, RawHash, RawSize, Payload = CompositeBuffer(std::move(Payload))](std::atomic<bool>&) {
+ [&, RawHash, RawSize, Payload = CompositeBuffer(std::move(Payload))](std::atomic<bool>&) mutable {
if (!AbortFlag)
{
const uint64_t PayloadSize = Payload.GetSize();
@@ -1868,7 +1909,7 @@ namespace {
ZenContentType::kCompressedBinary,
PayloadSize,
[Payload = std::move(Payload), &FilteredUploadedBytesPerSecond](uint64_t Offset,
- uint64_t Size) -> IoBuffer {
+ uint64_t Size) mutable -> IoBuffer {
FilteredUploadedBytesPerSecond.Start();
IoBuffer PartPayload = Payload.Mid(Offset, Size).Flatten().AsIoBuffer();
@@ -1974,9 +2015,11 @@ namespace {
}
ZEN_ASSERT(BlockDescription.BlockHash == BlockHash);
- CompositeBuffer Payload = WriteToTempFileIfNeeded(CompressedBlock.GetCompressed(),
- Path / ZenTempBlockFolderName,
- BlockDescription.BlockHash);
+ CompositeBuffer Payload = IsBufferDiskBased(CompressedBlock.GetCompressed())
+ ? std::move(CompressedBlock).GetCompressed()
+ : CompositeBuffer(WriteToTempFile(std::move(CompressedBlock).GetCompressed(),
+ Path / ZenTempBlockFolderName,
+ BlockDescription.BlockHash));
GenerateBlocksStats.GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex];
GenerateBlocksStats.GeneratedBlockCount++;
@@ -3289,33 +3332,40 @@ namespace {
uint64_t FileOffset,
uint64_t TargetFinalSize)
{
+ ZEN_TRACE_CPU("WriteFileCache_WriteToFile");
if (!SeenTargetIndexes.empty() && SeenTargetIndexes.back() == TargetIndex)
{
+ ZEN_TRACE_CPU("WriteFileCache_WriteToFile_CacheWrite");
ZEN_ASSERT(OpenFileWriter);
OpenFileWriter->Write(Buffer, FileOffset);
}
else
{
- Flush();
- const std::filesystem::path& TargetPath = GetTargetPath(TargetIndex);
- CreateDirectories(TargetPath.parent_path());
- uint32_t Tries = 5;
- std::unique_ptr<BasicFile> NewOutputFile(
- std::make_unique<BasicFile>(TargetPath, BasicFile::Mode::kWrite, [&Tries, TargetPath](std::error_code& Ec) {
- if (Tries < 3)
- {
- ZEN_CONSOLE("Failed opening file '{}': {}{}", TargetPath, Ec.message(), Tries > 1 ? " Retrying"sv : ""sv);
- }
- if (Tries > 1)
- {
- Sleep(100);
- }
- return --Tries > 0;
- }));
+ std::unique_ptr<BasicFile> NewOutputFile;
+ {
+ ZEN_TRACE_CPU("WriteFileCache_WriteToFile_Open");
+ Flush();
+ const std::filesystem::path& TargetPath = GetTargetPath(TargetIndex);
+ CreateDirectories(TargetPath.parent_path());
+ uint32_t Tries = 5;
+ NewOutputFile =
+ std::make_unique<BasicFile>(TargetPath, BasicFile::Mode::kWrite, [&Tries, TargetPath](std::error_code& Ec) {
+ if (Tries < 3)
+ {
+ ZEN_CONSOLE("Failed opening file '{}': {}{}", TargetPath, Ec.message(), Tries > 1 ? " Retrying"sv : ""sv);
+ }
+ if (Tries > 1)
+ {
+ Sleep(100);
+ }
+ return --Tries > 0;
+ });
+ }
const bool CacheWriter = TargetFinalSize > Buffer.GetSize();
if (CacheWriter)
{
+ ZEN_TRACE_CPU("WriteFileCache_WriteToFile_CacheWrite");
ZEN_ASSERT_SLOW(std::find(SeenTargetIndexes.begin(), SeenTargetIndexes.end(), TargetIndex) == SeenTargetIndexes.end());
OutputFile = std::move(NewOutputFile);
@@ -3325,6 +3375,7 @@ namespace {
}
else
{
+ ZEN_TRACE_CPU("WriteFileCache_WriteToFile_Write");
NewOutputFile->Write(Buffer, FileOffset);
}
}
@@ -3332,6 +3383,7 @@ namespace {
void Flush()
{
+ ZEN_TRACE_CPU("WriteFileCache_Flush");
OpenFileWriter = {};
OutputFile = {};
}
@@ -3376,7 +3428,7 @@ namespace {
const ChunkedFolderContent& RemoteContent,
const ChunkedContentLookup& Lookup,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
- const BlockWriteOps Ops,
+ const BlockWriteOps& Ops,
std::atomic<uint32_t>& OutChunksComplete,
std::atomic<uint64_t>& OutBytesWritten)
{
@@ -3420,13 +3472,19 @@ namespace {
if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
{
const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- const IoHash VerifyChunkHash =
- IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
- if (VerifyChunkHash != SequenceRawHash)
{
- throw std::runtime_error(
- fmt::format("Written hunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash));
+ ZEN_TRACE_CPU("VerifyChunkHash");
+ const IoHash VerifyChunkHash = IoHash::HashBuffer(
+ IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
+ if (VerifyChunkHash != SequenceRawHash)
+ {
+ throw std::runtime_error(fmt::format("Written chunk sequence {} hash does not match expected hash {}",
+ VerifyChunkHash,
+ SequenceRawHash));
+ }
}
+ ZEN_TRACE_CPU("VerifyChunkHashes_rename");
+ ZEN_ASSERT_SLOW(!std::filesystem::exists(GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
}
@@ -3437,139 +3495,38 @@ namespace {
bool GetBlockWriteOps(const ChunkedFolderContent& RemoteContent,
const ChunkedContentLookup& Lookup,
+ std::span<const IoHash> ChunkRawHashes,
+ std::span<const uint32_t> ChunkCompressedLengths,
+ std::span<const uint32_t> ChunkRawLengths,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- const CompositeBuffer& DecompressedBlockBuffer,
+ CompositeBuffer&& PartialBlockBuffer,
+ uint32_t FirstIncludedBlockChunkIndex,
+ uint32_t LastIncludedBlockChunkIndex,
BlockWriteOps& OutOps)
{
ZEN_TRACE_CPU("GetBlockWriteOps");
- SharedBuffer BlockBuffer = DecompressedBlockBuffer.Flatten();
- uint64_t HeaderSize = 0;
- if (IterateChunkBlock(
- BlockBuffer,
- [&](CompressedBuffer&& Chunk, const IoHash& ChunkHash) {
- if (auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash); It != Lookup.ChunkHashToChunkIndex.end())
- {
- const uint32_t ChunkIndex = It->second;
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
- GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, Lookup, ChunkIndex);
-
- if (!ChunkTargetPtrs.empty())
- {
- bool NeedsWrite = true;
- if (RemoteChunkIndexNeedsCopyFromSourceFlags[ChunkIndex].compare_exchange_strong(NeedsWrite, false))
- {
- CompositeBuffer Decompressed = Chunk.DecompressToComposite();
- if (!Decompressed)
- {
- throw std::runtime_error(fmt::format("Decompression of build blob {} failed", ChunkHash));
- }
- ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed));
- ZEN_ASSERT(Decompressed.GetSize() == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
- for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs)
- {
- OutOps.WriteOps.push_back(
- BlockWriteOps::WriteOpData{.Target = Target, .ChunkBufferIndex = OutOps.ChunkBuffers.size()});
- }
- OutOps.ChunkBuffers.emplace_back(std::move(Decompressed));
- }
- }
- }
- },
- HeaderSize))
- {
- std::sort(OutOps.WriteOps.begin(),
- OutOps.WriteOps.end(),
- [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) {
- if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
- {
- return true;
- }
- if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
- {
- return false;
- }
- return Lhs.Target->Offset < Rhs.Target->Offset;
- });
-
- return true;
+ MemoryView BlockMemoryView;
+ UniqueBuffer BlockMemoryBuffer;
+ IoBufferFileReference FileRef = {};
+ if (PartialBlockBuffer.GetSegments().size() == 1 && PartialBlockBuffer.GetSegments()[0].AsIoBuffer().GetFileReference(FileRef))
+ {
+ BlockMemoryBuffer = UniqueBuffer::Alloc(FileRef.FileChunkSize);
+ BasicFile Reader;
+ Reader.Attach(FileRef.FileHandle);
+ Reader.Read(BlockMemoryBuffer.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset);
+ BlockMemoryView = BlockMemoryBuffer.GetView();
+ Reader.Detach();
}
else
{
- return false;
- }
- }
-
- bool WriteBlockToDisk(const std::filesystem::path& CacheFolderPath,
- const ChunkedFolderContent& RemoteContent,
- const ChunkBlockDescription& BlockDescription,
- std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
- const CompositeBuffer& BlockBuffer,
- const ChunkedContentLookup& Lookup,
- std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- std::atomic<uint32_t>& OutChunksComplete,
- std::atomic<uint64_t>& OutBytesWritten)
- {
- ZEN_TRACE_CPU("WriteBlockToDisk");
-
- IoHash BlockRawHash;
- uint64_t BlockRawSize;
- CompressedBuffer CompressedBlockBuffer = CompressedBuffer::FromCompressed(BlockBuffer, BlockRawHash, BlockRawSize);
- if (!CompressedBlockBuffer)
- {
- throw std::runtime_error(fmt::format("Block {} is not a compressed buffer", BlockDescription.BlockHash));
- }
-
- if (BlockRawHash != BlockDescription.BlockHash)
- {
- throw std::runtime_error(
- fmt::format("Block {} header has a mismatching raw hash {}", BlockDescription.BlockHash, BlockRawHash));
- }
-
- CompositeBuffer DecompressedBlockBuffer = CompressedBlockBuffer.DecompressToComposite();
- if (!DecompressedBlockBuffer)
- {
- throw std::runtime_error(fmt::format("Block {} failed to decompress", BlockDescription.BlockHash));
- }
-
- ZEN_ASSERT_SLOW(BlockDescription.BlockHash == IoHash::HashBuffer(DecompressedBlockBuffer));
-
- BlockWriteOps Ops;
- if (GetBlockWriteOps(RemoteContent,
- Lookup,
- SequenceIndexChunksLeftToWriteCounters,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- DecompressedBlockBuffer,
- Ops))
- {
- WriteBlockChunkOps(CacheFolderPath,
- RemoteContent,
- Lookup,
- SequenceIndexChunksLeftToWriteCounters,
- Ops,
- OutChunksComplete,
- OutBytesWritten);
- return true;
+ BlockMemoryView = PartialBlockBuffer.ViewOrCopyRange(0, PartialBlockBuffer.GetSize(), BlockMemoryBuffer);
}
- return false;
- }
-
- bool GetPartialBlockWriteOps(const ChunkedFolderContent& RemoteContent,
- const ChunkedContentLookup& Lookup,
- const ChunkBlockDescription& BlockDescription,
- std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
- std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- const CompositeBuffer& PartialBlockBuffer,
- uint32_t FirstIncludedBlockChunkIndex,
- uint32_t LastIncludedBlockChunkIndex,
- BlockWriteOps& OutOps)
- {
- ZEN_TRACE_CPU("GetPartialBlockWriteOps");
uint32_t OffsetInBlock = 0;
for (uint32_t ChunkBlockIndex = FirstIncludedBlockChunkIndex; ChunkBlockIndex <= LastIncludedBlockChunkIndex; ChunkBlockIndex++)
{
- const uint32_t ChunkCompressedSize = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex];
- const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex];
+ const uint32_t ChunkCompressedSize = ChunkCompressedLengths[ChunkBlockIndex];
+ const IoHash& ChunkHash = ChunkRawHashes[ChunkBlockIndex];
if (auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash); It != Lookup.ChunkHashToChunkIndex.end())
{
const uint32_t ChunkIndex = It->second;
@@ -3581,7 +3538,9 @@ namespace {
bool NeedsWrite = true;
if (RemoteChunkIndexNeedsCopyFromSourceFlags[ChunkIndex].compare_exchange_strong(NeedsWrite, false))
{
- CompositeBuffer Chunk = PartialBlockBuffer.Mid(OffsetInBlock, ChunkCompressedSize);
+ // CompositeBuffer Chunk = PartialBlockBuffer.Mid(OffsetInBlock, ChunkCompressedSize);
+ MemoryView ChunkMemory = BlockMemoryView.Mid(OffsetInBlock, ChunkCompressedSize);
+ CompositeBuffer Chunk = CompositeBuffer(IoBuffer(IoBuffer::Wrap, ChunkMemory.GetData(), ChunkMemory.GetSize()));
IoHash VerifyChunkHash;
uint64_t VerifyRawSize;
CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Chunk, VerifyChunkHash, VerifyRawSize);
@@ -3593,9 +3552,12 @@ namespace {
{
ZEN_ASSERT(false);
}
- if (VerifyRawSize != BlockDescription.ChunkRawLengths[ChunkBlockIndex])
+ if (!ChunkRawLengths.empty())
{
- ZEN_ASSERT(false);
+ if (VerifyRawSize != ChunkRawLengths[ChunkBlockIndex])
+ {
+ ZEN_ASSERT(false);
+ }
}
CompositeBuffer Decompressed = Compressed.DecompressToComposite();
if (!Decompressed)
@@ -3632,11 +3594,85 @@ namespace {
return true;
}
+ bool WriteBlockToDisk(const std::filesystem::path& CacheFolderPath,
+ const ChunkedFolderContent& RemoteContent,
+ const ChunkBlockDescription& BlockDescription,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ CompositeBuffer&& BlockBuffer,
+ const ChunkedContentLookup& Lookup,
+ std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
+ std::atomic<uint32_t>& OutChunksComplete,
+ std::atomic<uint64_t>& OutBytesWritten)
+ {
+ ZEN_TRACE_CPU("WriteBlockToDisk");
+
+ BlockWriteOps Ops;
+ if ((BlockDescription.HeaderSize == 0) || BlockDescription.ChunkCompressedLengths.empty())
+ {
+ ZEN_TRACE_CPU("WriteBlockToDisk_Legacy");
+
+ UniqueBuffer CopyBuffer;
+ const MemoryView BlockView = BlockBuffer.ViewOrCopyRange(0, BlockBuffer.GetSize(), CopyBuffer);
+ uint64_t HeaderSize;
+ const std::vector<uint32_t> ChunkCompressedLengths = ReadChunkBlockHeader(BlockView, HeaderSize);
+
+ CompositeBuffer PartialBlockBuffer = std::move(BlockBuffer).Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + HeaderSize);
+
+ if (GetBlockWriteOps(RemoteContent,
+ Lookup,
+ BlockDescription.ChunkRawHashes,
+ ChunkCompressedLengths,
+ BlockDescription.ChunkRawLengths,
+ SequenceIndexChunksLeftToWriteCounters,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ std::move(PartialBlockBuffer),
+ 0,
+ gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1),
+ Ops))
+ {
+ WriteBlockChunkOps(CacheFolderPath,
+ RemoteContent,
+ Lookup,
+ SequenceIndexChunksLeftToWriteCounters,
+ Ops,
+ OutChunksComplete,
+ OutBytesWritten);
+ return true;
+ }
+ return false;
+ }
+
+ CompositeBuffer PartialBlockBuffer =
+ std::move(BlockBuffer).Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
+ if (GetBlockWriteOps(RemoteContent,
+ Lookup,
+ BlockDescription.ChunkRawHashes,
+ BlockDescription.ChunkCompressedLengths,
+ BlockDescription.ChunkRawLengths,
+ SequenceIndexChunksLeftToWriteCounters,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ std::move(PartialBlockBuffer),
+ 0,
+ gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1),
+ Ops))
+ {
+ WriteBlockChunkOps(CacheFolderPath,
+ RemoteContent,
+ Lookup,
+ SequenceIndexChunksLeftToWriteCounters,
+ Ops,
+ OutChunksComplete,
+ OutBytesWritten);
+ return true;
+ }
+ return false;
+ }
+
bool WritePartialBlockToDisk(const std::filesystem::path& CacheFolderPath,
const ChunkedFolderContent& RemoteContent,
const ChunkBlockDescription& BlockDescription,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
- const CompositeBuffer& PartialBlockBuffer,
+ CompositeBuffer&& PartialBlockBuffer,
uint32_t FirstIncludedBlockChunkIndex,
uint32_t LastIncludedBlockChunkIndex,
const ChunkedContentLookup& Lookup,
@@ -3646,15 +3682,17 @@ namespace {
{
ZEN_TRACE_CPU("WritePartialBlockToDisk");
BlockWriteOps Ops;
- if (GetPartialBlockWriteOps(RemoteContent,
- Lookup,
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- PartialBlockBuffer,
- FirstIncludedBlockChunkIndex,
- LastIncludedBlockChunkIndex,
- Ops))
+ if (GetBlockWriteOps(RemoteContent,
+ Lookup,
+ BlockDescription.ChunkRawHashes,
+ BlockDescription.ChunkCompressedLengths,
+ BlockDescription.ChunkRawLengths,
+ SequenceIndexChunksLeftToWriteCounters,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ std::move(PartialBlockBuffer),
+ FirstIncludedBlockChunkIndex,
+ LastIncludedBlockChunkIndex,
+ Ops))
{
WriteBlockChunkOps(CacheFolderPath,
RemoteContent,
@@ -3671,7 +3709,7 @@ namespace {
}
}
- SharedBuffer Decompress(const CompositeBuffer& CompressedChunk, const IoHash& ChunkHash, const uint64_t ChunkRawSize)
+ SharedBuffer Decompress(CompositeBuffer&& CompressedChunk, const IoHash& ChunkHash, const uint64_t ChunkRawSize)
{
ZEN_TRACE_CPU("Decompress");
@@ -3709,7 +3747,7 @@ namespace {
const ChunkedFolderContent& Content,
const ChunkedContentLookup& Lookup,
std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> ChunkTargets,
- const CompositeBuffer& ChunkData,
+ CompositeBuffer&& ChunkData,
WriteFileCache& OpenFileCache,
std::atomic<uint64_t>& OutBytesWritten)
{
@@ -3734,8 +3772,8 @@ namespace {
}
}
- bool CanStreamDecompress(const ChunkedFolderContent& RemoteContent,
- const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations)
+ bool CanDecompressDirectToSequence(const ChunkedFolderContent& RemoteContent,
+ const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations)
{
if (Locations.size() == 1)
{
@@ -3776,13 +3814,25 @@ namespace {
}
IoHashStream Hash;
bool CouldDecompress = Compressed.DecompressToStream(0, (uint64_t)-1, [&](uint64_t Offset, const CompositeBuffer& RangeBuffer) {
- DecompressedTemp.Write(RangeBuffer, Offset);
- for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ ZEN_TRACE_CPU("StreamDecompress_Write");
+ if (!AbortFlag)
{
- Hash.Append(Segment.GetView());
+ DecompressedTemp.Write(RangeBuffer, Offset);
+ for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ {
+ Hash.Append(Segment.GetView());
+ }
+ WriteToDiskBytes += RangeBuffer.GetSize();
+ return true;
}
- WriteToDiskBytes += RangeBuffer.GetSize();
+ return false;
});
+
+ if (AbortFlag)
+ {
+ return;
+ }
+
if (!CouldDecompress)
{
throw std::runtime_error(fmt::format("Failed to decompress large blob {}", SequenceRawHash));
@@ -3801,9 +3851,82 @@ namespace {
}
}
+ bool WriteCompressedChunk(const std::filesystem::path& TargetFolder,
+ const ChunkedFolderContent& RemoteContent,
+ const ChunkedContentLookup& RemoteLookup,
+ const IoHash& ChunkHash,
+ const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
+ IoBuffer&& CompressedPart,
+ std::atomic<uint64_t>& WriteToDiskBytes)
+ {
+ auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
+ ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end());
+ if (CanDecompressDirectToSequence(RemoteContent, ChunkTargetPtrs))
+ {
+ const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[ChunkTargetPtrs.front()->SequenceIndex];
+ StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), WriteToDiskBytes);
+ }
+ else
+ {
+ const uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second;
+ SharedBuffer Chunk =
+ Decompress(CompositeBuffer(std::move(CompressedPart)), ChunkHash, RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+
+ if (!AbortFlag)
+ {
+ WriteFileCache OpenFileCache;
+
+ WriteChunkToDisk(TargetFolder,
+ RemoteContent,
+ RemoteLookup,
+ ChunkTargetPtrs,
+ CompositeBuffer(std::move(Chunk)),
+ OpenFileCache,
+ WriteToDiskBytes);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ void CompleteChunkTargets(const std::filesystem::path& TargetFolder,
+ const ChunkedFolderContent& RemoteContent,
+ const IoHash& ChunkHash,
+ const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ const bool NeedHashVerify)
+ {
+ ZEN_TRACE_CPU("CompleteChunkTargets");
+
+ for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs)
+ {
+ const uint32_t RemoteSequenceIndex = Location->SequenceIndex;
+ if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
+ {
+ const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ if (NeedHashVerify)
+ {
+ ZEN_TRACE_CPU("VerifyChunkHash");
+
+ const IoHash VerifyChunkHash =
+ IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
+ if (VerifyChunkHash != ChunkHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, ChunkHash));
+ }
+ }
+
+ ZEN_TRACE_CPU("RenameToFinal");
+ ZEN_ASSERT_SLOW(!std::filesystem::exists(GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
+ std::filesystem::rename(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash),
+ GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash));
+ }
+ }
+ }
+
void DownloadLargeBlob(BuildStorage& Storage,
- const std::filesystem::path& TempFolderPath,
- const std::filesystem::path& CacheFolderPath,
+ const std::filesystem::path& Path,
const ChunkedFolderContent& RemoteContent,
const ChunkedContentLookup& RemoteLookup,
const Oid& BuildId,
@@ -3818,6 +3941,7 @@ namespace {
std::atomic<uint64_t>& BytesDownloaded,
std::atomic<uint64_t>& MultipartAttachmentCount,
std::function<void(uint64_t DowloadedBytes)>&& OnDownloadComplete,
+ std::function<void()>&& OnWriteStart,
std::function<void()>&& OnWriteComplete)
{
ZEN_TRACE_CPU("DownloadLargeBlob");
@@ -3828,8 +3952,11 @@ namespace {
};
std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
+ std::filesystem::path DownloadFolder = Path / ZenTempDownloadFolderName;
+ std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName;
+
std::error_code Ec;
- Workload->TempFile.CreateTemporary(TempFolderPath, Ec);
+ Workload->TempFile.CreateTemporary(DownloadFolder, Ec);
if (Ec)
{
throw std::runtime_error(
@@ -3839,7 +3966,8 @@ namespace {
BuildId,
ChunkHash,
PreferredMultipartChunkSize,
- [&CacheFolderPath,
+ [DownloadFolder,
+ TargetFolder,
&RemoteContent,
&RemoteLookup,
&Work,
@@ -3849,6 +3977,7 @@ namespace {
&BytesDownloaded,
OnDownloadComplete = std::move(OnDownloadComplete),
OnWriteComplete = std::move(OnWriteComplete),
+ OnWriteStart = std::move(OnWriteStart),
&WriteToDiskBytes,
SequenceIndexChunksLeftToWriteCounters,
ChunkTargetPtrs = std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>(
@@ -3857,6 +3986,7 @@ namespace {
if (!AbortFlag.load())
{
+ ZEN_TRACE_CPU("DownloadLargeBlob_Save");
Workload->TempFile.Write(Chunk.GetView(), Offset);
if (Chunk.GetSize() == BytesRemaining)
{
@@ -3864,103 +3994,61 @@ namespace {
Work.ScheduleWork(
WritePool, // GetSyncWorkerPool(),//
- [&CacheFolderPath,
+ [DownloadFolder,
+ TargetFolder,
&RemoteContent,
&RemoteLookup,
ChunkHash,
Workload,
Offset,
OnWriteComplete = std::move(OnWriteComplete),
+ OnWriteStart = std::move(OnWriteStart),
&WriteToDiskBytes,
SequenceIndexChunksLeftToWriteCounters,
ChunkTargetPtrs](std::atomic<bool>&) {
- ZEN_TRACE_CPU("DownloadLargeBlob_Work");
+ ZEN_TRACE_CPU("DownloadLargeBlob_Write");
if (!AbortFlag)
{
- uint64_t CompressedSize = Workload->TempFile.FileSize();
- void* FileHandle = Workload->TempFile.Detach();
- IoBuffer CompressedPart = IoBuffer(IoBuffer::File,
- FileHandle,
- 0,
- CompressedSize,
- /*IsWholeFile*/ true);
- if (!CompressedPart)
+ const std::filesystem::path CompressedChunkPath = DownloadFolder / ChunkHash.ToHexString();
+ std::error_code Ec;
+ Workload->TempFile.MoveTemporaryIntoPlace(CompressedChunkPath, Ec);
+ if (Ec)
{
- throw std::runtime_error(
- fmt::format("Multipart build blob {} is not a compressed buffer", ChunkHash));
+ throw std::runtime_error(fmt::format("Failed moving downloaded chunk {} file to {}. Reason: {}",
+ ChunkHash,
+ CompressedChunkPath,
+ Ec.message()));
}
- CompressedPart.SetDeleteOnClose(true);
- auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
- ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end());
- bool NeedHashVerify = true;
- if (CanStreamDecompress(RemoteContent, ChunkTargetPtrs))
+ IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
+ if (!CompressedPart)
{
- const IoHash& SequenceRawHash =
- RemoteContent.ChunkedContent.SequenceRawHashes[ChunkTargetPtrs.front()->SequenceIndex];
- StreamDecompress(CacheFolderPath,
- SequenceRawHash,
- CompositeBuffer(std::move(CompressedPart)),
- WriteToDiskBytes);
- NeedHashVerify = false;
- OnWriteComplete();
+ throw std::runtime_error(fmt::format("Could not open dowloaded compressed chunk {} from {}",
+ ChunkHash,
+ CompressedChunkPath));
}
- else
- {
- const uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second;
- SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)),
- ChunkHash,
- RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
- // ZEN_ASSERT_SLOW(ChunkHash ==
- // IoHash::HashBuffer(Chunk.AsIoBuffer()));
+ OnWriteStart();
- if (!AbortFlag)
- {
- WriteFileCache OpenFileCache;
-
- WriteChunkToDisk(CacheFolderPath,
- RemoteContent,
- RemoteLookup,
- ChunkTargetPtrs,
- CompositeBuffer(Chunk),
- OpenFileCache,
- WriteToDiskBytes);
- OnWriteComplete();
- }
- }
+ bool NeedHashVerify = WriteCompressedChunk(TargetFolder,
+ RemoteContent,
+ RemoteLookup,
+ ChunkHash,
+ ChunkTargetPtrs,
+ std::move(CompressedPart),
+ WriteToDiskBytes);
if (!AbortFlag)
{
- // Write tracking, updating this must be done without any files open (WriteFileCache)
- for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs)
- {
- const uint32_t RemoteSequenceIndex = Location->SequenceIndex;
- if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
- {
- const IoHash& SequenceRawHash =
- RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- if (NeedHashVerify)
- {
- ZEN_TRACE_CPU("VerifyChunkHash");
+ std::filesystem::remove(CompressedChunkPath);
- const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
- GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
- if (VerifyChunkHash != ChunkHash)
- {
- throw std::runtime_error(
- fmt::format("Written chunk sequence {} hash does not match expected hash {}",
- VerifyChunkHash,
- ChunkHash));
- }
- }
-
- ZEN_TRACE_CPU("VerifyChunkHashes_rename");
- std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
- GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
- }
- }
+ CompleteChunkTargets(TargetFolder,
+ RemoteContent,
+ ChunkHash,
+ ChunkTargetPtrs,
+ SequenceIndexChunksLeftToWriteCounters,
+ NeedHashVerify);
}
}
},
@@ -3977,6 +4065,7 @@ namespace {
Work.ScheduleWork(
NetworkPool, // GetSyncWorkerPool(),//
[WorkItem = std::move(WorkItem)](std::atomic<bool>&) {
+ ZEN_TRACE_CPU("DownloadLargeBlob_Work");
if (!AbortFlag)
{
WorkItem();
@@ -4023,30 +4112,143 @@ namespace {
Stopwatch CacheMappingTimer;
- uint64_t CacheMappedBytesForReuse = 0;
std::vector<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters(RemoteContent.ChunkedContent.SequenceRawHashes.size());
// std::vector<bool> RemoteSequenceIndexIsCachedFlags(RemoteContent.ChunkedContent.SequenceRawHashes.size(), false);
- std::vector<bool> RemoteChunkIndexIsCachedFlags(RemoteContent.ChunkedContent.ChunkHashes.size());
+ std::vector<bool> RemoteChunkIndexNeedsCopyFromLocalFileFlags(RemoteContent.ChunkedContent.ChunkHashes.size());
// Guard if he same chunks is in multiple blocks (can happen due to block reuse, cache reuse blocks writes directly)
std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(RemoteContent.ChunkedContent.ChunkHashes.size());
- // Pick up all whole files we can use from current local state
- for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < RemoteContent.ChunkedContent.SequenceRawHashes.size();
- RemoteSequenceIndex++)
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedChunkHashesFound;
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedSequenceHashesFound;
+ uint64_t CachedChunkHashesByteCountFound = 0;
+ uint64_t CachedSequenceHashesByteCountFound = 0;
{
- const IoHash& RemoteSequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- if (auto It = LocalLookup.RawHashToSequenceIndex.find(RemoteSequenceRawHash); It != LocalLookup.RawHashToSequenceIndex.end())
+ ZEN_TRACE_CPU("UpdateFolder_CheckChunkCache");
+
+ DirectoryContent CacheDirContent;
+ GetDirectoryContent(CacheFolderPath,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes,
+ CacheDirContent);
+ for (size_t Index = 0; Index < CacheDirContent.Files.size(); Index++)
{
- SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = 0;
- const uint32_t RemotePathIndex = GetFirstPathIndexForRawHash(RemoteLookup, RemoteSequenceRawHash);
- CacheMappedBytesForReuse += RemoteContent.RawSizes[RemotePathIndex];
+ IoHash FileHash;
+ if (IoHash::TryParse(CacheDirContent.Files[Index].filename().string(), FileHash))
+ {
+ if (auto ChunkIt = RemoteLookup.ChunkHashToChunkIndex.find(FileHash);
+ ChunkIt != RemoteLookup.ChunkHashToChunkIndex.end())
+ {
+ const uint32_t ChunkIndex = ChunkIt->second;
+ const uint64_t ChunkSize = RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex];
+ if (ChunkSize == CacheDirContent.FileSizes[Index])
+ {
+ CachedChunkHashesFound.insert({FileHash, ChunkIndex});
+ CachedChunkHashesByteCountFound += ChunkSize;
+ continue;
+ }
+ }
+ else if (auto SequenceIt = RemoteLookup.RawHashToSequenceIndex.find(FileHash);
+ SequenceIt != RemoteLookup.RawHashToSequenceIndex.end())
+ {
+ const uint32_t SequenceIndex = SequenceIt->second;
+ const uint32_t PathIndex = RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex];
+ const uint64_t SequenceSize = RemoteContent.RawSizes[PathIndex];
+ if (SequenceSize == CacheDirContent.FileSizes[Index])
+ {
+ CachedSequenceHashesFound.insert({FileHash, SequenceIndex});
+ CachedSequenceHashesByteCountFound += SequenceSize;
+ continue;
+ }
+ }
+ }
+ std::filesystem::remove(CacheDirContent.Files[Index]);
}
- else
+ }
+
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedBlocksFound;
+ uint64_t CachedBlocksByteCountFound = 0;
+ {
+ ZEN_TRACE_CPU("UpdateFolder_CheckBlockCache");
+
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> AllBlockSizes;
+ AllBlockSizes.reserve(BlockDescriptions.size());
+ for (uint32_t BlockIndex = 0; BlockIndex < BlockDescriptions.size(); BlockIndex++)
{
- SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex];
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+ AllBlockSizes.insert({BlockDescription.BlockHash, BlockIndex});
+ }
+
+ DirectoryContent BlockDirContent;
+ GetDirectoryContent(Path / ZenTempBlockFolderName,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes,
+ BlockDirContent);
+ CachedBlocksFound.reserve(BlockDirContent.Files.size());
+ for (size_t Index = 0; Index < BlockDirContent.Files.size(); Index++)
+ {
+ IoHash FileHash;
+ if (IoHash::TryParse(BlockDirContent.Files[Index].filename().string(), FileHash))
+ {
+ if (auto BlockIt = AllBlockSizes.find(FileHash); BlockIt != AllBlockSizes.end())
+ {
+ const uint32_t BlockIndex = BlockIt->second;
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+ uint64_t BlockSize = CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize;
+ for (uint64_t ChunkSize : BlockDescription.ChunkCompressedLengths)
+ {
+ BlockSize += ChunkSize;
+ }
+
+ if (BlockSize == BlockDirContent.FileSizes[Index])
+ {
+ CachedBlocksFound.insert({FileHash, BlockIndex});
+ CachedBlocksByteCountFound += BlockSize;
+ continue;
+ }
+ }
+ }
+ std::filesystem::remove(BlockDirContent.Files[Index]);
}
}
+ std::vector<uint32_t> LocalPathIndexesMatchingSequenceIndexes;
+ uint64_t LocalPathIndexesByteCountMatchingSequenceIndexes = 0;
+ // Pick up all whole files we can use from current local state
+ {
+ ZEN_TRACE_CPU("UpdateFolder_CheckLocalChunks");
+ for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < RemoteContent.ChunkedContent.SequenceRawHashes.size();
+ RemoteSequenceIndex++)
+ {
+ const IoHash& RemoteSequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ if (auto CacheSequenceIt = CachedSequenceHashesFound.find(RemoteSequenceRawHash);
+ CacheSequenceIt != CachedSequenceHashesFound.end())
+ {
+ // const uint32_t RemoteSequenceIndex = CacheSequenceIt->second;
+ // const uint32_t RemotePathIndex = GetFirstPathIndexForSeqeuenceIndex(RemoteLookup, RemoteSequenceIndex);
+ // RemoteSequenceByteCountFoundInCache += RemoteContent.RawSizes[RemotePathIndex];
+ }
+ else if (auto CacheChunkIt = CachedChunkHashesFound.find(RemoteSequenceRawHash);
+ CacheChunkIt != CachedChunkHashesFound.end())
+ {
+ // const uint32_t RemoteChunkIndex = CacheChunkIt->second;
+ // const uint32_t RemotePathIndex = GetFirstPathIndexForSeqeuenceIndex(RemoteLookup, RemoteSequenceIndex);
+ // RemoteSequenceByteCountFoundInCache += RemoteContent.RawSizes[RemotePathIndex];
+ }
+ else if (auto It = LocalLookup.RawHashToSequenceIndex.find(RemoteSequenceRawHash);
+ It != LocalLookup.RawHashToSequenceIndex.end())
+ {
+ const uint32_t LocalSequenceIndex = It->second;
+ const uint32_t LocalPathIndex = GetFirstPathIndexForSeqeuenceIndex(LocalLookup, LocalSequenceIndex);
+ uint64_t RawSize = LocalContent.RawSizes[LocalPathIndex];
+ LocalPathIndexesMatchingSequenceIndexes.push_back(LocalPathIndex);
+ LocalPathIndexesByteCountMatchingSequenceIndexes += RawSize;
+ }
+ else
+ {
+ // We must write the sequence
+ SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] =
+ RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex];
+ }
+ }
+ }
// Pick up all chunks in current local state
struct CacheCopyData
{
@@ -4063,75 +4265,103 @@ namespace {
tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCacheCopyDataIndex;
std::vector<CacheCopyData> CacheCopyDatas;
+ uint64_t LocalChunkHashesMatchingRemoteCount = 0;
+ uint64_t LocalChunkHashesMatchingRemoteByteCount = 0;
- for (uint32_t LocalSequenceIndex = 0; LocalSequenceIndex < LocalContent.ChunkedContent.SequenceRawHashes.size();
- LocalSequenceIndex++)
{
- const IoHash& LocalSequenceRawHash = LocalContent.ChunkedContent.SequenceRawHashes[LocalSequenceIndex];
- const uint32_t LocalOrderOffset = LocalLookup.SequenceIndexChunkOrderOffset[LocalSequenceIndex];
+ ZEN_TRACE_CPU("UpdateFolder_GetLocalChunks");
+ for (uint32_t LocalSequenceIndex = 0; LocalSequenceIndex < LocalContent.ChunkedContent.SequenceRawHashes.size();
+ LocalSequenceIndex++)
{
- uint64_t SourceOffset = 0;
- const uint32_t LocalChunkCount = LocalContent.ChunkedContent.ChunkCounts[LocalSequenceIndex];
- for (uint32_t LocalOrderIndex = 0; LocalOrderIndex < LocalChunkCount; LocalOrderIndex++)
- {
- const uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[LocalOrderOffset + LocalOrderIndex];
- const IoHash& LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
- const uint64_t LocalChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex];
+ const IoHash& LocalSequenceRawHash = LocalContent.ChunkedContent.SequenceRawHashes[LocalSequenceIndex];
+ const uint32_t LocalOrderOffset = LocalLookup.SequenceIndexChunkOrderOffset[LocalSequenceIndex];
- if (auto RemoteChunkIt = RemoteLookup.ChunkHashToChunkIndex.find(LocalChunkHash);
- RemoteChunkIt != RemoteLookup.ChunkHashToChunkIndex.end())
+ {
+ uint64_t SourceOffset = 0;
+ const uint32_t LocalChunkCount = LocalContent.ChunkedContent.ChunkCounts[LocalSequenceIndex];
+ for (uint32_t LocalOrderIndex = 0; LocalOrderIndex < LocalChunkCount; LocalOrderIndex++)
{
- const uint32_t RemoteChunkIndex = RemoteChunkIt->second;
- if (!RemoteChunkIndexIsCachedFlags[RemoteChunkIndex])
- {
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
- GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex);
+ const uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[LocalOrderOffset + LocalOrderIndex];
+ const IoHash& LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
+ const uint64_t LocalChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex];
- if (!ChunkTargetPtrs.empty())
+ if (auto RemoteChunkIt = RemoteLookup.ChunkHashToChunkIndex.find(LocalChunkHash);
+ RemoteChunkIt != RemoteLookup.ChunkHashToChunkIndex.end())
+ {
+ const uint32_t RemoteChunkIndex = RemoteChunkIt->second;
+ if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
{
- CacheCopyData::ChunkTarget Target = {
- .TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()),
- .ChunkRawSize = LocalChunkRawSize,
- .CacheFileOffset = SourceOffset};
- if (auto CopySourceIt = RawHashToCacheCopyDataIndex.find(LocalSequenceRawHash);
- CopySourceIt != RawHashToCacheCopyDataIndex.end())
- {
- CacheCopyData& Data = CacheCopyDatas[CopySourceIt->second];
- Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(),
- ChunkTargetPtrs.begin(),
- ChunkTargetPtrs.end());
- Data.ChunkTargets.push_back(Target);
- }
- else
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
+ GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex);
+
+ if (!ChunkTargetPtrs.empty())
{
- RawHashToCacheCopyDataIndex.insert_or_assign(LocalSequenceRawHash, CacheCopyDatas.size());
- CacheCopyDatas.push_back(
- CacheCopyData{.LocalSequenceIndex = LocalSequenceIndex,
- .TargetChunkLocationPtrs = ChunkTargetPtrs,
- .ChunkTargets = std::vector<CacheCopyData::ChunkTarget>{Target}});
+ CacheCopyData::ChunkTarget Target = {
+ .TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()),
+ .ChunkRawSize = LocalChunkRawSize,
+ .CacheFileOffset = SourceOffset};
+ if (auto CopySourceIt = RawHashToCacheCopyDataIndex.find(LocalSequenceRawHash);
+ CopySourceIt != RawHashToCacheCopyDataIndex.end())
+ {
+ CacheCopyData& Data = CacheCopyDatas[CopySourceIt->second];
+ if (Data.TargetChunkLocationPtrs.size() > 1024)
+ {
+ RawHashToCacheCopyDataIndex.insert_or_assign(LocalSequenceRawHash, CacheCopyDatas.size());
+ CacheCopyDatas.push_back(
+ CacheCopyData{.LocalSequenceIndex = LocalSequenceIndex,
+ .TargetChunkLocationPtrs = ChunkTargetPtrs,
+ .ChunkTargets = std::vector<CacheCopyData::ChunkTarget>{Target}});
+ }
+ else
+ {
+ Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(),
+ ChunkTargetPtrs.begin(),
+ ChunkTargetPtrs.end());
+ Data.ChunkTargets.push_back(Target);
+ }
+ }
+ else
+ {
+ RawHashToCacheCopyDataIndex.insert_or_assign(LocalSequenceRawHash, CacheCopyDatas.size());
+ CacheCopyDatas.push_back(
+ CacheCopyData{.LocalSequenceIndex = LocalSequenceIndex,
+ .TargetChunkLocationPtrs = ChunkTargetPtrs,
+ .ChunkTargets = std::vector<CacheCopyData::ChunkTarget>{Target}});
+ }
+ LocalChunkHashesMatchingRemoteByteCount += LocalChunkRawSize;
+ LocalChunkHashesMatchingRemoteCount++;
+ RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true;
}
- CacheMappedBytesForReuse += LocalChunkRawSize;
- RemoteChunkIndexIsCachedFlags[RemoteChunkIndex] = true;
}
}
+ SourceOffset += LocalChunkRawSize;
}
- SourceOffset += LocalChunkRawSize;
}
}
}
- if (CacheMappedBytesForReuse > 0)
+ if (!CachedSequenceHashesFound.empty() || !CachedChunkHashesFound.empty() || !CachedBlocksFound.empty() ||
+ !LocalPathIndexesMatchingSequenceIndexes.empty() || LocalChunkHashesMatchingRemoteCount > 0)
{
- ZEN_CONSOLE("Mapped {} cached data for reuse in {}",
- NiceBytes(CacheMappedBytesForReuse),
- NiceTimeSpanMs(CacheMappingTimer.GetElapsedTimeMs()));
+ ZEN_CONSOLE(
+ "Cache: {} ({}) chunk sequences, {} ({}) chunks, {} ({}) blocks. Local state: {} ({}) chunk sequences, {} ({}) chunks",
+ CachedSequenceHashesFound.size(),
+ NiceBytes(CachedSequenceHashesByteCountFound),
+ CachedChunkHashesFound.size(),
+ NiceBytes(CachedChunkHashesByteCountFound),
+ CachedBlocksFound.size(),
+ NiceBytes(CachedBlocksByteCountFound),
+ LocalPathIndexesMatchingSequenceIndexes.size(),
+ NiceBytes(LocalPathIndexesByteCountMatchingSequenceIndexes),
+ LocalChunkHashesMatchingRemoteCount,
+ NiceBytes(LocalChunkHashesMatchingRemoteByteCount));
}
uint32_t ChunkCountToWrite = 0;
for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++)
{
- if (RemoteChunkIndexIsCachedFlags[RemoteChunkIndex])
+ if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
{
ChunkCountToWrite++;
}
@@ -4154,7 +4384,7 @@ namespace {
std::atomic<size_t> WritePartsComplete = 0;
{
- ZEN_TRACE_CPU("HandleChunks");
+ ZEN_TRACE_CPU("WriteChunks");
Stopwatch WriteTimer;
@@ -4169,17 +4399,21 @@ namespace {
std::atomic<uint64_t> BytesDownloaded = 0;
- for (const IoHash ChunkHash : LooseChunkHashes)
+ struct LooseChunkHashWorkData
{
- if (AbortFlag)
- {
- break;
- }
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs;
+ uint32_t RemoteChunkIndex;
+ };
+ std::vector<LooseChunkHashWorkData> LooseChunkHashWorks;
+ TotalPartWriteCount += CacheCopyDatas.size();
+
+ for (const IoHash ChunkHash : LooseChunkHashes)
+ {
auto RemoteChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
ZEN_ASSERT(RemoteChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end());
const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second;
- if (RemoteChunkIndexIsCachedFlags[RemoteChunkIndex])
+ if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
{
ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash);
continue;
@@ -4198,173 +4432,192 @@ namespace {
{
TotalRequestCount++;
TotalPartWriteCount++;
- Work.ScheduleWork(
- NetworkPool, // GetSyncWorkerPool(),//
- [&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) {
- ZEN_TRACE_CPU("UpdateFolder_LooseChunk");
+ LooseChunkHashWorks.push_back(
+ LooseChunkHashWorkData{.ChunkTargetPtrs = ChunkTargetPtrs, .RemoteChunkIndex = RemoteChunkIndex});
+ }
+ }
+ }
- if (!AbortFlag)
+ uint32_t BlockCount = gsl::narrow<uint32_t>(BlockDescriptions.size());
+
+ std::vector<bool> ChunkIsPickedUpByBlock(RemoteContent.ChunkedContent.ChunkHashes.size(), false);
+ auto GetNeededChunkBlockIndexes = [&RemoteContent,
+ &RemoteLookup,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &ChunkIsPickedUpByBlock](const ChunkBlockDescription& BlockDescription) {
+ ZEN_TRACE_CPU("UpdateFolder_GetNeededChunkBlockIndexes");
+ std::vector<uint32_t> NeededBlockChunkIndexes;
+ for (uint32_t ChunkBlockIndex = 0; ChunkBlockIndex < BlockDescription.ChunkRawHashes.size(); ChunkBlockIndex++)
+ {
+ const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex];
+ if (auto It = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != RemoteLookup.ChunkHashToChunkIndex.end())
+ {
+ const uint32_t RemoteChunkIndex = It->second;
+ if (!ChunkIsPickedUpByBlock[RemoteChunkIndex])
+ {
+ if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex])
+ {
+ ChunkIsPickedUpByBlock[RemoteChunkIndex] = true;
+ NeededBlockChunkIndexes.push_back(ChunkBlockIndex);
+ }
+ }
+ }
+ }
+ return NeededBlockChunkIndexes;
+ };
+
+ std::vector<uint32_t> CachedChunkBlockIndexes;
+
+ struct BlockRangeDescriptor
+ {
+ uint32_t BlockIndex = (uint32_t)-1;
+ uint64_t RangeStart = 0;
+ uint64_t RangeLength = 0;
+ uint32_t ChunkBlockIndexStart = 0;
+ uint32_t ChunkBlockIndexCount = 0;
+ };
+ std::vector<BlockRangeDescriptor> BlockRangeWorks;
+
+ std::vector<uint32_t> FullBlockWorks;
+
+ size_t BlocksNeededCount = 0;
+ uint64_t AllBlocksSize = 0;
+ uint64_t AllBlocksFetch = 0;
+ uint64_t AllBlocksSlack = 0;
+ uint64_t AllBlockRequests = 0;
+ uint64_t AllBlockChunksSize = 0;
+ for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++)
+ {
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+ const std::vector<uint32_t> BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription);
+ if (!BlockChunkIndexNeeded.empty())
+ {
+ bool UsingCachedBlock = false;
+ if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end())
+ {
+ ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_CacheGet");
+
+ TotalPartWriteCount++;
+
+ std::filesystem::path BlockPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString();
+ if (std::filesystem::exists(BlockPath))
+ {
+ CachedChunkBlockIndexes.push_back(BlockIndex);
+ UsingCachedBlock = true;
+ }
+ }
+
+ if (!UsingCachedBlock)
+ {
+ bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size();
+ bool CanDoPartialBlockDownload =
+ (BlockDescription.HeaderSize > 0) &&
+ (BlockDescription.ChunkCompressedLengths.size() == BlockDescription.ChunkRawHashes.size());
+ if (AllowPartialBlockRequests && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload)
+ {
+ std::vector<BlockRangeDescriptor> BlockRanges;
+
+ ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialAnalysis");
+
+ uint32_t NeedBlockChunkIndexOffset = 0;
+ uint32_t ChunkBlockIndex = 0;
+ uint32_t CurrentOffset =
+ gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
+
+ BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex};
+ while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() &&
+ ChunkBlockIndex < BlockDescription.ChunkRawHashes.size())
+ {
+ const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex];
+ if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
{
- FilteredDownloadedBytesPerSecond.Start();
- if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize)
+ if (NextRange.RangeLength > 0)
{
- DownloadLargeBlob(
- Storage,
- Path / ZenTempChunkFolderName,
- CacheFolderPath,
- RemoteContent,
- RemoteLookup,
- BuildId,
- ChunkHash,
- PreferredMultipartChunkSize,
- ChunkTargetPtrs,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- WritePool,
- NetworkPool,
- WriteToDiskBytes,
- BytesDownloaded,
- MultipartAttachmentCount,
- [&](uint64_t BytesDownloaded) {
- LooseChunksBytes += BytesDownloaded;
- RequestsComplete++;
- if (RequestsComplete == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- },
- [&]() {
- ChunkCountWritten++;
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
- });
+ BlockRanges.push_back(NextRange);
+ NextRange = {.BlockIndex = BlockIndex};
}
- else
+ ChunkBlockIndex++;
+ CurrentOffset += ChunkCompressedLength;
+ }
+ else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
+ {
+ AllBlockChunksSize += ChunkCompressedLength;
+ if (NextRange.RangeLength == 0)
{
- IoBuffer CompressedPart = Storage.GetBuildBlob(BuildId, ChunkHash);
- if (!CompressedPart)
- {
- throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
- }
- BytesDownloaded += CompressedPart.GetSize();
- LooseChunksBytes += CompressedPart.GetSize();
- RequestsComplete++;
- if (RequestsComplete == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(CompressedPart)),
- Path / ZenTempChunkFolderName,
- ChunkHash);
- DownloadedChunks++;
+ NextRange.RangeStart = CurrentOffset;
+ NextRange.ChunkBlockIndexStart = ChunkBlockIndex;
+ }
+ NextRange.RangeLength += ChunkCompressedLength;
+ NextRange.ChunkBlockIndexCount++;
+ ChunkBlockIndex++;
+ CurrentOffset += ChunkCompressedLength;
+ NeedBlockChunkIndexOffset++;
+ }
+ else
+ {
+ ZEN_ASSERT(false);
+ }
+ }
+ AllBlocksSize += CurrentOffset;
+ if (NextRange.RangeLength > 0)
+ {
+ BlockRanges.push_back(NextRange);
+ }
- if (!AbortFlag)
- {
- Work.ScheduleWork(
- WritePool,
- [&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs, CompressedPart = std::move(Payload)](
- std::atomic<bool>&) {
- ZEN_TRACE_CPU("UpdateFolder_WriteBlob");
+ ZEN_ASSERT(!BlockRanges.empty());
+ std::vector<BlockRangeDescriptor> CollapsedBlockRanges;
+ auto It = BlockRanges.begin();
+ CollapsedBlockRanges.push_back(*It++);
+ uint64_t TotalSlack = 0;
+ while (It != BlockRanges.end())
+ {
+ BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back();
+ uint64_t Slack = It->RangeStart - (LastRange.RangeStart + LastRange.RangeLength);
+ uint64_t BothRangeSize = It->RangeLength + LastRange.RangeLength;
+ if (Slack <= Max(BothRangeSize / 8, 64u * 1024u)) // Made up heuristic - we'll see how it pans out
+ {
+ LastRange.ChunkBlockIndexCount =
+ (It->ChunkBlockIndexStart + It->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart;
+ LastRange.RangeLength = (It->RangeStart + It->RangeLength) - LastRange.RangeStart;
+ TotalSlack += Slack;
+ }
+ else
+ {
+ CollapsedBlockRanges.push_back(*It);
+ }
+ ++It;
+ }
- if (!AbortFlag)
- {
- FilteredWrittenBytesPerSecond.Start();
+ uint64_t TotalFetch = 0;
+ for (const BlockRangeDescriptor& Range : CollapsedBlockRanges)
+ {
+ TotalFetch += Range.RangeLength;
+ }
- bool NeedHashVerify = true;
- if (CanStreamDecompress(RemoteContent, ChunkTargetPtrs))
- {
- const IoHash& SequenceRawHash =
- RemoteContent.ChunkedContent
- .SequenceRawHashes[ChunkTargetPtrs.front()->SequenceIndex];
- StreamDecompress(CacheFolderPath,
- SequenceRawHash,
- CompositeBuffer(CompressedPart),
- WriteToDiskBytes);
- ChunkCountWritten++;
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
- NeedHashVerify = false;
- }
- else
- {
- SharedBuffer Chunk =
- Decompress(CompressedPart,
- ChunkHash,
- RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex]);
-
- {
- WriteFileCache OpenFileCache;
- WriteChunkToDisk(CacheFolderPath,
- RemoteContent,
- RemoteLookup,
- ChunkTargetPtrs,
- CompositeBuffer(Chunk),
- OpenFileCache,
- WriteToDiskBytes);
- ChunkCountWritten++;
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
- }
- }
- if (!AbortFlag)
- {
- WritePartsComplete++;
-
- // Write tracking, updating this must be done without any files open
- // (WriteFileCache)
- for (const ChunkedContentLookup::ChunkSequenceLocation* Location :
- ChunkTargetPtrs)
- {
- const uint32_t RemoteSequenceIndex = Location->SequenceIndex;
- if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(
- 1) == 1)
- {
- const IoHash& SequenceRawHash =
- RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- if (NeedHashVerify)
- {
- ZEN_TRACE_CPU("UpdateFolder_VerifyHash");
-
- const IoHash VerifyChunkHash =
- IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
- GetTempChunkedSequenceFileName(CacheFolderPath,
- SequenceRawHash)));
- if (VerifyChunkHash != SequenceRawHash)
- {
- throw std::runtime_error(
- fmt::format("Written chunk sequence {} hash does not match "
- "expected hash {}",
- VerifyChunkHash,
- SequenceRawHash));
- }
- }
-
- ZEN_TRACE_CPU("UpdateFolder_rename");
- std::filesystem::rename(
- GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
- GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
- }
- }
- }
- }
- },
- Work.DefaultErrorFunction());
- }
- }
- }
- },
- Work.DefaultErrorFunction());
+ AllBlocksFetch += TotalFetch;
+ AllBlocksSlack += TotalSlack;
+ BlocksNeededCount++;
+ AllBlockRequests += CollapsedBlockRanges.size();
+
+ TotalRequestCount += CollapsedBlockRanges.size();
+ TotalPartWriteCount += CollapsedBlockRanges.size();
+
+ BlockRangeWorks.insert(BlockRangeWorks.end(), CollapsedBlockRanges.begin(), CollapsedBlockRanges.end());
+ }
+ else
+ {
+ BlocksNeededCount++;
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+
+ FullBlockWorks.push_back(BlockIndex);
+ }
}
}
+ else
+ {
+ ZEN_DEBUG("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash);
+ }
}
for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++)
@@ -4374,14 +4627,12 @@ namespace {
break;
}
- TotalPartWriteCount++;
-
Work.ScheduleWork(
WritePool, // GetSyncWorkerPool(),//
[&, CopyDataIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
- ZEN_TRACE_CPU("UpdateFolder_Copy");
+ ZEN_TRACE_CPU("UpdateFolder_CopyLocal");
FilteredWrittenBytesPerSecond.Start();
const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex];
@@ -4403,39 +4654,45 @@ namespace {
};
std::vector<WriteOp> WriteOps;
- WriteOps.reserve(AllTargets.size());
- for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets)
+ if (!AbortFlag)
{
- std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange =
- AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount);
- for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange)
+ ZEN_TRACE_CPU("Sort");
+ WriteOps.reserve(AllTargets.size());
+ for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets)
{
- WriteOps.push_back(WriteOp{.Target = Target,
- .CacheFileOffset = ChunkTarget.CacheFileOffset,
- .ChunkSize = ChunkTarget.ChunkRawSize});
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange =
+ AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount);
+ for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange)
+ {
+ WriteOps.push_back(WriteOp{.Target = Target,
+ .CacheFileOffset = ChunkTarget.CacheFileOffset,
+ .ChunkSize = ChunkTarget.ChunkRawSize});
+ }
+ TargetStart += ChunkTarget.TargetChunkLocationCount;
}
- TargetStart += ChunkTarget.TargetChunkLocationCount;
- }
- std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) {
- if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
- {
- return true;
- }
- else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
- {
+ std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) {
+ if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
+ {
+ return true;
+ }
+ else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
+ {
+ return false;
+ }
+ if (Lhs.Target->Offset < Rhs.Target->Offset)
+ {
+ return true;
+ }
return false;
- }
- if (Lhs.Target->Offset < Rhs.Target->Offset)
- {
- return true;
- }
- return false;
- });
+ });
+ }
if (!AbortFlag)
{
+ ZEN_TRACE_CPU("Write");
+
BufferedOpenFile SourceFile(LocalFilePath);
WriteFileCache OpenFileCache;
for (const WriteOp& Op : WriteOps)
@@ -4473,23 +4730,26 @@ namespace {
// Write tracking, updating this must be done without any files open (WriteFileCache)
for (const WriteOp& Op : WriteOps)
{
- ZEN_TRACE_CPU("UpdateFolder_Copy_VerifyHash");
-
const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
{
const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
- GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
- if (VerifyChunkHash != SequenceRawHash)
{
- throw std::runtime_error(
- fmt::format("Written chunk sequence {} hash does not match expected hash {}",
- VerifyChunkHash,
- SequenceRawHash));
+ ZEN_TRACE_CPU("VerifyHash");
+ const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
+ GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
+ if (VerifyChunkHash != SequenceRawHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Written chunk sequence {} hash does not match expected hash {}",
+ VerifyChunkHash,
+ SequenceRawHash));
+ }
}
- ZEN_TRACE_CPU("UpdateFolder_Copy_rename");
+ ZEN_TRACE_CPU("rename");
+ ZEN_ASSERT_SLOW(
+ !std::filesystem::exists(GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
}
@@ -4508,300 +4768,643 @@ namespace {
Work.DefaultErrorFunction());
}
- size_t BlockCount = BlockDescriptions.size();
-
- std::vector<bool> ChunkIsPickedUpByBlock(RemoteContent.ChunkedContent.ChunkHashes.size(), false);
- auto GetNeededChunkBlockIndexes = [&RemoteContent,
- &RemoteLookup,
- &RemoteChunkIndexNeedsCopyFromSourceFlags,
- &ChunkIsPickedUpByBlock](const ChunkBlockDescription& BlockDescription) {
- std::vector<uint32_t> NeededBlockChunkIndexes;
- for (uint32_t ChunkBlockIndex = 0; ChunkBlockIndex < BlockDescription.ChunkRawHashes.size(); ChunkBlockIndex++)
- {
- const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex];
- if (auto It = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != RemoteLookup.ChunkHashToChunkIndex.end())
- {
- const uint32_t RemoteChunkIndex = It->second;
- if (!ChunkIsPickedUpByBlock[RemoteChunkIndex])
- {
- if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex])
- {
- ChunkIsPickedUpByBlock[RemoteChunkIndex] = true;
- NeededBlockChunkIndexes.push_back(ChunkBlockIndex);
- }
- }
- }
- }
- return NeededBlockChunkIndexes;
- };
-
- size_t BlocksNeededCount = 0;
- uint64_t AllBlocksSize = 0;
- uint64_t AllBlocksFetch = 0;
- uint64_t AllBlocksSlack = 0;
- uint64_t AllBlockRequests = 0;
- uint64_t AllBlockChunksSize = 0;
- for (size_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++)
+ for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++)
{
- if (Work.IsAborted())
+ if (AbortFlag)
{
break;
}
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
- const std::vector<uint32_t> BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription);
- if (!BlockChunkIndexNeeded.empty())
- {
- bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size();
- bool CanDoPartialBlockDownload = (BlockDescription.HeaderSize > 0) && (BlockDescription.ChunkCompressedLengths.size() ==
- BlockDescription.ChunkRawHashes.size());
- if (AllowPartialBlockRequests && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload)
- {
- struct BlockRangeDescriptor
- {
- uint64_t RangeStart = 0;
- uint64_t RangeLength = 0;
- uint32_t ChunkBlockIndexStart = 0;
- uint32_t ChunkBlockIndexCount = 0;
- };
- std::vector<BlockRangeDescriptor> BlockRanges;
-
- ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialAnalisys");
-
- uint32_t NeedBlockChunkIndexOffset = 0;
- uint32_t ChunkBlockIndex = 0;
- uint32_t CurrentOffset =
- gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
-
- BlockRangeDescriptor NextRange;
- while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() &&
- ChunkBlockIndex < BlockDescription.ChunkRawHashes.size())
+
+ LooseChunkHashWorkData& LooseChunkHashWork = LooseChunkHashWorks[LooseChunkHashWorkIndex];
+
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
+ std::move(LooseChunkHashWork.ChunkTargetPtrs);
+ const uint32_t RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex;
+
+ Work.ScheduleWork(
+ NetworkPool, // NetworkPool, // GetSyncWorkerPool(),//
+ [&, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) {
+ if (!AbortFlag)
{
- const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex];
- if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
- {
- if (NextRange.RangeLength > 0)
- {
- BlockRanges.push_back(NextRange);
- NextRange = {};
- }
- ChunkBlockIndex++;
- CurrentOffset += ChunkCompressedLength;
- }
- else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
+ std::filesystem::path ExistingCompressedChunkPath;
{
- AllBlockChunksSize += ChunkCompressedLength;
- if (NextRange.RangeLength == 0)
+ const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ std::filesystem::path CompressedChunkPath = Path / ZenTempDownloadFolderName / ChunkHash.ToHexString();
+ if (std::filesystem::exists(CompressedChunkPath))
{
- NextRange.RangeStart = CurrentOffset;
- NextRange.ChunkBlockIndexStart = ChunkBlockIndex;
+ IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(ExistingCompressedChunkPath);
+ if (ExistingCompressedPart)
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, RawHash, RawSize))
+ {
+ LooseChunksBytes += ExistingCompressedPart.GetSize();
+ RequestsComplete++;
+ if (RequestsComplete == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ ExistingCompressedChunkPath = std::move(CompressedChunkPath);
+ }
+ else
+ {
+ std::error_code DummyEc;
+ std::filesystem::remove(CompressedChunkPath, DummyEc);
+ }
+ }
}
- NextRange.RangeLength += ChunkCompressedLength;
- NextRange.ChunkBlockIndexCount++;
- ChunkBlockIndex++;
- CurrentOffset += ChunkCompressedLength;
- NeedBlockChunkIndexOffset++;
}
- else
+ if (!ExistingCompressedChunkPath.empty())
{
- ZEN_ASSERT(false);
- }
- }
- AllBlocksSize += CurrentOffset;
- if (NextRange.RangeLength > 0)
- {
- BlockRanges.push_back(NextRange);
- NextRange = {};
- }
+ Work.ScheduleWork(
+ WritePool, // WritePool, GetSyncWorkerPool()
+ [&Path,
+ &RemoteContent,
+ &RemoteLookup,
+ &CacheFolderPath,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &WriteToDiskBytes,
+ &ChunkCountWritten,
+ &WritePartsComplete,
+ &TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ RemoteChunkIndex,
+ ChunkTargetPtrs,
+ CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded");
- ZEN_ASSERT(!BlockRanges.empty());
- std::vector<BlockRangeDescriptor> CollapsedBlockRanges;
- auto It = BlockRanges.begin();
- CollapsedBlockRanges.push_back(*It++);
- uint64_t TotalSlack = 0;
- while (It != BlockRanges.end())
- {
- BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back();
- uint64_t Slack = It->RangeStart - (LastRange.RangeStart + LastRange.RangeLength);
- uint64_t BothRangeSize = It->RangeLength + LastRange.RangeLength;
- if (Slack <= Max(BothRangeSize / 8, 64u * 1024u)) // Made up heuristic - we'll see how it pans out
- {
- LastRange.ChunkBlockIndexCount =
- (It->ChunkBlockIndexStart + It->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart;
- LastRange.RangeLength = (It->RangeStart + It->RangeLength) - LastRange.RangeStart;
- TotalSlack += Slack;
+ FilteredWrittenBytesPerSecond.Start();
+
+ const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+
+ IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
+ if (!CompressedPart)
+ {
+ throw std::runtime_error(fmt::format("Could not open dowloaded compressed chunk {} from {}",
+ ChunkHash,
+ CompressedChunkPath));
+ }
+
+ std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName;
+ bool NeedHashVerify = WriteCompressedChunk(TargetFolder,
+ RemoteContent,
+ RemoteLookup,
+ ChunkHash,
+ ChunkTargetPtrs,
+ std::move(CompressedPart),
+ WriteToDiskBytes);
+
+ if (!AbortFlag)
+ {
+ ChunkCountWritten++;
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+
+ std::filesystem::remove(CompressedChunkPath);
+
+ CompleteChunkTargets(TargetFolder,
+ RemoteContent,
+ ChunkHash,
+ ChunkTargetPtrs,
+ SequenceIndexChunksLeftToWriteCounters,
+ NeedHashVerify);
+ }
+ }
+ },
+ Work.DefaultErrorFunction());
}
else
{
- CollapsedBlockRanges.push_back(*It);
- }
- ++It;
- }
-
- uint64_t TotalFetch = 0;
- for (const BlockRangeDescriptor& Range : CollapsedBlockRanges)
- {
- TotalFetch += Range.RangeLength;
- }
-
- AllBlocksFetch += TotalFetch;
- AllBlocksSlack += TotalSlack;
- BlocksNeededCount++;
- AllBlockRequests += CollapsedBlockRanges.size();
+ FilteredDownloadedBytesPerSecond.Start();
+ const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk");
+ DownloadLargeBlob(
+ Storage,
+ Path,
+ RemoteContent,
+ RemoteLookup,
+ BuildId,
+ ChunkHash,
+ PreferredMultipartChunkSize,
+ ChunkTargetPtrs,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
+ NetworkPool,
+ WriteToDiskBytes,
+ BytesDownloaded,
+ MultipartAttachmentCount,
+ [&](uint64_t BytesDownloaded) {
+ LooseChunksBytes += BytesDownloaded;
+ RequestsComplete++;
+ if (RequestsComplete == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ },
+ [&]() { FilteredWrittenBytesPerSecond.Start(); },
+ [&]() {
+ ChunkCountWritten++;
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+ });
+ }
+ else
+ {
+ ZEN_TRACE_CPU("UpdateFolder_GetChunk");
- for (size_t BlockRangeIndex = 0; BlockRangeIndex < CollapsedBlockRanges.size(); BlockRangeIndex++)
- {
- TotalRequestCount++;
- TotalPartWriteCount++;
- const BlockRangeDescriptor BlockRange = CollapsedBlockRanges[BlockRangeIndex];
- // Partial block schedule
- Work.ScheduleWork(
- NetworkPool, // NetworkPool, // GetSyncWorkerPool()
- [&, BlockIndex, BlockRange](std::atomic<bool>&) {
- ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialGet");
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
-
- FilteredDownloadedBytesPerSecond.Start();
- IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId,
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeLength);
- if (!BlockBuffer)
+ IoBuffer BuildBlob = Storage.GetBuildBlob(BuildId, ChunkHash);
+ if (!BuildBlob)
{
- throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
+ throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
}
- BytesDownloaded += BlockBuffer.GetSize();
- BlockBytes += BlockBuffer.GetSize();
- DownloadedBlocks++;
- CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(BlockBuffer)),
- Path / ZenTempBlockFolderName,
- BlockDescription.BlockHash,
- fmt::format("_{}", BlockRange.RangeStart));
+ uint64_t BlobSize = BuildBlob.GetSize();
+ BytesDownloaded += BlobSize;
+ LooseChunksBytes += BlobSize;
RequestsComplete++;
if (RequestsComplete == TotalRequestCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
- if (!AbortFlag)
+ std::filesystem::path CompressedChunkPath;
+
+ // Check if the dowloaded file is file based and we can move it directly without rewriting it
{
- Work.ScheduleWork(
- WritePool, // WritePool, // GetSyncWorkerPool(),
- [&, BlockIndex, BlockRange, BlockPartialBuffer = std::move(Payload)](std::atomic<bool>&) {
- if (!AbortFlag)
+ IoBufferFileReference FileRef;
+ if (BuildBlob.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
+ (FileRef.FileChunkSize == BlobSize))
+ {
+ ZEN_TRACE_CPU("UpdateFolder_MoveTempChunk");
+ std::error_code Ec;
+ std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
+ {
+ BuildBlob.SetDeleteOnClose(false);
+ BuildBlob = {};
+ CompressedChunkPath = Path / ZenTempDownloadFolderName / ChunkHash.ToHexString();
+ std::filesystem::rename(TempBlobPath, CompressedChunkPath, Ec);
+ if (Ec)
{
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+ CompressedChunkPath = std::filesystem::path{};
- FilteredWrittenBytesPerSecond.Start();
-
- if (!WritePartialBlockToDisk(
- CacheFolderPath,
- RemoteContent,
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- BlockPartialBuffer,
- BlockRange.ChunkBlockIndexStart,
- BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
- RemoteLookup,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- ChunkCountWritten,
- WriteToDiskBytes))
- {
- throw std::runtime_error(
- fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
- }
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
+ // Re-open the temp file again
+ BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
+ BuildBlob = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlobSize, true);
+ BuildBlob.SetDeleteOnClose(true);
}
- },
- [&, BlockIndex](const std::exception& Ex, std::atomic<bool>&) {
- ZEN_ERROR("Failed writing block {}. Reason: {}",
- BlockDescriptions[BlockIndex].BlockHash,
- Ex.what());
- AbortFlag = true;
- });
+ }
+ }
}
- },
- Work.DefaultErrorFunction());
- }
- }
- else
- {
- BlocksNeededCount++;
- TotalRequestCount++;
- TotalPartWriteCount++;
-
- Work.ScheduleWork(
- NetworkPool, // GetSyncWorkerPool(), // NetworkPool,
- [&, BlockIndex](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_Get");
- // Full block schedule
-
- FilteredDownloadedBytesPerSecond.Start();
- IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescription.BlockHash);
- if (!BlockBuffer)
+ if (CompressedChunkPath.empty() && (BlobSize > 512u * 1024u))
{
- throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
- }
- BytesDownloaded += BlockBuffer.GetSize();
- BlockBytes += BlockBuffer.GetSize();
- DownloadedBlocks++;
- RequestsComplete++;
- if (RequestsComplete == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
+ ZEN_TRACE_CPU("UpdateFolder_WriteTempChunk");
+ // Could not be moved and rather large, lets store it on disk
+ CompressedChunkPath = Path / ZenTempDownloadFolderName / ChunkHash.ToHexString();
+ TemporaryFile::SafeWriteFile(CompressedChunkPath, BuildBlob);
+ BuildBlob = {};
}
+ DownloadedChunks++;
- CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(BlockBuffer)),
- Path / ZenTempBlockFolderName,
- BlockDescription.BlockHash);
if (!AbortFlag)
{
Work.ScheduleWork(
- WritePool,
- [&, BlockIndex, BlockBuffer = std::move(Payload)](std::atomic<bool>&) {
+ WritePool, // WritePool, GetSyncWorkerPool()
+ [&Path,
+ &RemoteContent,
+ &RemoteLookup,
+ &CacheFolderPath,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &WriteToDiskBytes,
+ &ChunkCountWritten,
+ &WritePartsComplete,
+ &TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ RemoteChunkIndex,
+ ChunkTargetPtrs,
+ CompressedChunkPath,
+ CompressedPart = std::move(BuildBlob)](std::atomic<bool>&) mutable {
if (!AbortFlag)
{
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+ ZEN_TRACE_CPU("UpdateFolder_WriteChunk");
FilteredWrittenBytesPerSecond.Start();
- if (!WriteBlockToDisk(CacheFolderPath,
- RemoteContent,
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- BlockBuffer,
- RemoteLookup,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- ChunkCountWritten,
- WriteToDiskBytes))
+
+ const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ if (CompressedChunkPath.empty())
+ {
+ ZEN_ASSERT(CompressedPart);
+ }
+ else
{
- throw std::runtime_error(
- fmt::format("Block {} is malformed", BlockDescription.BlockHash));
+ ZEN_ASSERT(!CompressedPart);
+ CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
+ if (!CompressedPart)
+ {
+ throw std::runtime_error(
+ fmt::format("Could not open dowloaded compressed chunk {} from {}",
+ ChunkHash,
+ CompressedChunkPath));
+ }
}
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
+
+ std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName;
+ bool NeedHashVerify = WriteCompressedChunk(TargetFolder,
+ RemoteContent,
+ RemoteLookup,
+ ChunkHash,
+ ChunkTargetPtrs,
+ std::move(CompressedPart),
+ WriteToDiskBytes);
+
+ if (!AbortFlag)
{
- FilteredWrittenBytesPerSecond.Stop();
+ ChunkCountWritten++;
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+
+ if (!CompressedChunkPath.empty())
+ {
+ std::filesystem::remove(CompressedChunkPath);
+ }
+
+ CompleteChunkTargets(TargetFolder,
+ RemoteContent,
+ ChunkHash,
+ ChunkTargetPtrs,
+ SequenceIndexChunksLeftToWriteCounters,
+ NeedHashVerify);
}
}
},
Work.DefaultErrorFunction());
}
}
- },
- Work.DefaultErrorFunction());
- }
+ }
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+
+ for (uint32_t BlockIndex : CachedChunkBlockIndexes)
+ {
+ if (AbortFlag)
+ {
+ break;
}
- else
+
+ Work.ScheduleWork(
+ WritePool, // GetSyncWorkerPool(), // WritePool,
+ [&, BlockIndex](std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WriteCachedBlock");
+
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+ FilteredWrittenBytesPerSecond.Start();
+
+ std::filesystem::path BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString();
+ IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(
+ fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath));
+ }
+
+ if (!WriteBlockToDisk(CacheFolderPath,
+ RemoteContent,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ CompositeBuffer(std::move(BlockBuffer)),
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ ChunkCountWritten,
+ WriteToDiskBytes))
+ {
+ std::error_code DummyEc;
+ std::filesystem::remove(BlockChunkPath, DummyEc);
+ throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
+ }
+ WritePartsComplete++;
+ std::filesystem::remove(BlockChunkPath);
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+
+ for (size_t BlockRangeIndex = 0; BlockRangeIndex < BlockRangeWorks.size(); BlockRangeIndex++)
+ {
+ if (AbortFlag)
{
- ZEN_DEBUG("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash);
+ break;
}
+ const BlockRangeDescriptor BlockRange = BlockRangeWorks[BlockRangeIndex];
+ ZEN_ASSERT(BlockRange.BlockIndex != (uint32_t)-1);
+ const uint32_t BlockIndex = BlockRange.BlockIndex;
+ Work.ScheduleWork(
+ NetworkPool, // NetworkPool, // GetSyncWorkerPool()
+ [&, BlockIndex, BlockRange](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_GetPartialBlock");
+
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+
+ FilteredDownloadedBytesPerSecond.Start();
+ IoBuffer BlockBuffer =
+ Storage.GetBuildBlob(BuildId, BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength);
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
+ }
+ uint64_t BlockSize = BlockBuffer.GetSize();
+ BytesDownloaded += BlockSize;
+ BlockBytes += BlockSize;
+ DownloadedBlocks++;
+ RequestsComplete++;
+ if (RequestsComplete == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+
+ std::filesystem::path BlockChunkPath;
+
+ // Check if the dowloaded block is file based and we can move it directly without rewriting it
+ {
+ IoBufferFileReference FileRef;
+ if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
+ (FileRef.FileChunkSize == BlockSize))
+ {
+ ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock");
+
+ std::error_code Ec;
+ std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
+ {
+ BlockBuffer.SetDeleteOnClose(false);
+ BlockBuffer = {};
+ BlockChunkPath = Path / ZenTempBlockFolderName /
+ fmt::format("{}_{:x}_{:x}",
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
+ std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec);
+ if (Ec)
+ {
+ BlockChunkPath = std::filesystem::path{};
+
+ // Re-open the temp file again
+ BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
+ BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
+ BlockBuffer.SetDeleteOnClose(true);
+ }
+ }
+ }
+ }
+
+ if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock");
+ // Could not be moved and rather large, lets store it on disk
+ BlockChunkPath =
+ Path / ZenTempBlockFolderName /
+ fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength);
+ TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
+ BlockBuffer = {};
+ }
+
+ if (!AbortFlag)
+ {
+ Work.ScheduleWork(
+ WritePool, // WritePool, // GetSyncWorkerPool(),
+ [&, BlockIndex, BlockRange, BlockChunkPath, BlockPartialBuffer = std::move(BlockBuffer)](
+ std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WritePartialBlock");
+
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+
+ if (BlockChunkPath.empty())
+ {
+ ZEN_ASSERT(BlockPartialBuffer);
+ }
+ else
+ {
+ ZEN_ASSERT(!BlockPartialBuffer);
+ BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockPartialBuffer)
+ {
+ throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}",
+ BlockDescription.BlockHash,
+ BlockChunkPath));
+ }
+ }
+
+ FilteredWrittenBytesPerSecond.Start();
+
+ if (!WritePartialBlockToDisk(
+ CacheFolderPath,
+ RemoteContent,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ CompositeBuffer(std::move(BlockPartialBuffer)),
+ BlockRange.ChunkBlockIndexStart,
+ BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ ChunkCountWritten,
+ WriteToDiskBytes))
+ {
+ std::error_code DummyEc;
+ std::filesystem::remove(BlockChunkPath, DummyEc);
+ throw std::runtime_error(
+ fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
+ }
+ WritePartsComplete++;
+
+ if (!BlockChunkPath.empty())
+ {
+ std::filesystem::remove(BlockChunkPath);
+ }
+
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+ }
+ },
+ Work.DefaultErrorFunction());
}
+
+ for (uint32_t BlockIndex : FullBlockWorks)
+ {
+ if (AbortFlag)
+ {
+ break;
+ }
+ Work.ScheduleWork(
+ NetworkPool, // GetSyncWorkerPool(), // NetworkPool,
+ [&, BlockIndex](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_GetFullBlock");
+
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+
+ FilteredDownloadedBytesPerSecond.Start();
+ IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescription.BlockHash);
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
+ }
+ uint64_t BlockSize = BlockBuffer.GetSize();
+ BytesDownloaded += BlockSize;
+ BlockBytes += BlockSize;
+ DownloadedBlocks++;
+ RequestsComplete++;
+ if (RequestsComplete == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+
+ std::filesystem::path BlockChunkPath;
+
+ // Check if the dowloaded block is file based and we can move it directly without rewriting it
+ {
+ IoBufferFileReference FileRef;
+ if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
+ (FileRef.FileChunkSize == BlockSize))
+ {
+ ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock");
+ std::error_code Ec;
+ std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
+ {
+ BlockBuffer.SetDeleteOnClose(false);
+ BlockBuffer = {};
+ BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString();
+ std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec);
+ if (Ec)
+ {
+ BlockChunkPath = std::filesystem::path{};
+
+ // Re-open the temp file again
+ BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
+ BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
+ BlockBuffer.SetDeleteOnClose(true);
+ }
+ }
+ }
+ }
+
+ if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock");
+ // Could not be moved and rather large, lets store it on disk
+ BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString();
+ TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
+ BlockBuffer = {};
+ }
+
+ if (!AbortFlag)
+ {
+ Work.ScheduleWork(
+ WritePool, // WritePool, GetSyncWorkerPool()
+ [&RemoteContent,
+ &RemoteLookup,
+ &CacheFolderPath,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ BlockIndex,
+ &BlockDescriptions,
+ &ChunkCountWritten,
+ &WriteToDiskBytes,
+ &WritePartsComplete,
+ &TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ BlockChunkPath,
+ BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock");
+
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+
+ if (BlockChunkPath.empty())
+ {
+ ZEN_ASSERT(BlockBuffer);
+ }
+ else
+ {
+ ZEN_ASSERT(!BlockBuffer);
+ BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}",
+ BlockDescription.BlockHash,
+ BlockChunkPath));
+ }
+ }
+
+ FilteredWrittenBytesPerSecond.Start();
+ if (!WriteBlockToDisk(CacheFolderPath,
+ RemoteContent,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ CompositeBuffer(std::move(BlockBuffer)),
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ ChunkCountWritten,
+ WriteToDiskBytes))
+ {
+ std::error_code DummyEc;
+ std::filesystem::remove(BlockChunkPath, DummyEc);
+ throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
+ }
+ WritePartsComplete++;
+
+ if (!BlockChunkPath.empty())
+ {
+ std::filesystem::remove(BlockChunkPath);
+ }
+
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+
ZEN_DEBUG("Fetching {} with {} slack (ideal {}) out of {} using {} requests for {} blocks",
NiceBytes(AllBlocksFetch),
NiceBytes(AllBlocksSlack),
@@ -4809,28 +5412,30 @@ namespace {
NiceBytes(AllBlocksSize),
AllBlockRequests,
BlocksNeededCount);
- ZEN_TRACE_CPU("HandleChunks_Wait");
-
- Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
- ZEN_ASSERT(ChunkCountToWrite >= ChunkCountWritten.load());
- FilteredWrittenBytesPerSecond.Update(WriteToDiskBytes.load());
- FilteredDownloadedBytesPerSecond.Update(BytesDownloaded.load());
- std::string Details = fmt::format("{}/{} ({} {}bits/s) downloaded. {}/{} ({} {}B/s) written.",
- RequestsComplete.load(),
- TotalRequestCount,
- NiceBytes(BytesDownloaded.load()),
- NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8),
- ChunkCountWritten.load(),
- ChunkCountToWrite,
- NiceBytes(WriteToDiskBytes.load()),
- NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()));
- WriteProgressBar.UpdateState({.Task = "Writing chunks ",
- .Details = Details,
- .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite),
- .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - ChunkCountWritten.load())},
- false);
- });
+ {
+ ZEN_TRACE_CPU("WriteChunks_Wait");
+
+ Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(IsAborted, PendingWork);
+ ZEN_ASSERT(ChunkCountToWrite >= ChunkCountWritten.load());
+ FilteredWrittenBytesPerSecond.Update(WriteToDiskBytes.load());
+ FilteredDownloadedBytesPerSecond.Update(BytesDownloaded.load());
+ std::string Details = fmt::format("{}/{} ({} {}bits/s) downloaded. {}/{} ({} {}B/s) written.",
+ RequestsComplete.load(),
+ TotalRequestCount,
+ NiceBytes(BytesDownloaded.load()),
+ NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8),
+ ChunkCountWritten.load(),
+ ChunkCountToWrite,
+ NiceBytes(WriteToDiskBytes.load()),
+ NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()));
+ WriteProgressBar.UpdateState({.Task = "Writing chunks ",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite),
+ .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - ChunkCountWritten.load())},
+ false);
+ });
+ }
FilteredWrittenBytesPerSecond.Stop();
FilteredDownloadedBytesPerSecond.Stop();
@@ -4842,19 +5447,32 @@ namespace {
WriteProgressBar.Finish();
- ZEN_CONSOLE("Downloaded {} ({}bits/s). Wrote {} ({}B/s). Completed in {}",
+ uint32_t RawSequencesMissingWriteCount = 0;
+ for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++)
+ {
+ const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex];
+ if (SequenceIndexChunksLeftToWriteCounter.load() != 0)
+ {
+ RawSequencesMissingWriteCount++;
+ const uint32_t PathIndex = RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex];
+ const std::filesystem::path& IncompletePath = RemoteContent.Paths[PathIndex];
+ ZEN_ASSERT(!IncompletePath.empty());
+ const uint32_t ExpectedSequenceCount = RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex];
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount);
+ }
+ }
+ ZEN_ASSERT(RawSequencesMissingWriteCount == 0);
+
+ ZEN_CONSOLE("Downloaded {} ({}bits/s) in {}. Wrote {} ({}B/s) in {}. Completed in {}",
NiceBytes(BytesDownloaded.load()),
NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), BytesDownloaded * 8)),
+ NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000),
NiceBytes(WriteToDiskBytes.load()),
NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), WriteToDiskBytes.load())),
+ NiceTimeSpanMs(FilteredWrittenBytesPerSecond.GetElapsedTimeUS() / 1000),
NiceTimeSpanMs(WriteTimer.GetElapsedTimeMs()));
}
- for (const auto& SequenceIndexChunksLeftToWriteCounter : SequenceIndexChunksLeftToWriteCounters)
- {
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() == 0);
- }
-
std::vector<std::pair<IoHash, uint32_t>> Targets;
Targets.reserve(RemoteContent.Paths.size());
for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++)
@@ -4867,17 +5485,24 @@ namespace {
// Move all files we will reuse to cache folder
// TODO: If WipeTargetFolder is false we could check which files are already correct and leave them in place
- for (uint32_t LocalPathIndex = 0; LocalPathIndex < LocalContent.Paths.size(); LocalPathIndex++)
+ if (!LocalPathIndexesMatchingSequenceIndexes.empty())
{
- const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex];
- if (RemoteLookup.RawHashToSequenceIndex.contains(RawHash))
+ ZEN_TRACE_CPU("UpdateFolder_CacheReused");
+ uint64_t TotalFullFileSizeCached = 0;
+ for (uint32_t LocalPathIndex : LocalPathIndexesMatchingSequenceIndexes)
{
+ const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex];
const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash);
ZEN_ASSERT_SLOW(std::filesystem::exists(LocalFilePath));
SetFileReadOnly(LocalFilePath, false);
+ ZEN_ASSERT_SLOW(!std::filesystem::exists(CacheFilePath));
std::filesystem::rename(LocalFilePath, CacheFilePath);
+ TotalFullFileSizeCached += std::filesystem::file_size(CacheFilePath);
}
+ ZEN_CONSOLE("Saved {} ({}) unchanged files in cache",
+ LocalPathIndexesMatchingSequenceIndexes.size(),
+ NiceBytes(TotalFullFileSizeCached));
}
if (WipeTargetFolder)
@@ -4923,6 +5548,8 @@ namespace {
}
{
+ ZEN_TRACE_CPU("UpdateFolder_FinalizeTree");
+
WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
ProgressBar RebuildProgressBar(UsePlainProgress);
@@ -4943,8 +5570,6 @@ namespace {
break;
}
- ZEN_TRACE_CPU("UpdateFolder_FinalizeTree");
-
size_t TargetCount = 1;
const IoHash& RawHash = Targets[TargetOffset].first;
while (Targets[TargetOffset + TargetCount].first == RawHash)
@@ -5038,17 +5663,19 @@ namespace {
TargetOffset += TargetCount;
}
- ZEN_TRACE_CPU("FinalizeTree_Wait");
+ {
+ ZEN_TRACE_CPU("FinalizeTree_Wait");
- Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
- std::string Details = fmt::format("{}/{} files", TargetsComplete.load(), Targets.size());
- RebuildProgressBar.UpdateState({.Task = "Rebuilding state ",
- .Details = Details,
- .TotalCount = gsl::narrow<uint64_t>(Targets.size()),
- .RemainingCount = gsl::narrow<uint64_t>(Targets.size() - TargetsComplete.load())},
- false);
- });
+ Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(IsAborted, PendingWork);
+ std::string Details = fmt::format("{}/{} files", TargetsComplete.load(), Targets.size());
+ RebuildProgressBar.UpdateState({.Task = "Rebuilding state ",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(Targets.size()),
+ .RemainingCount = gsl::narrow<uint64_t>(Targets.size() - TargetsComplete.load())},
+ false);
+ });
+ }
if (AbortFlag)
{
@@ -5608,15 +6235,10 @@ namespace {
const std::filesystem::path ZenTempFolder = Path / ZenTempFolderName;
CreateDirectories(ZenTempFolder);
- auto _ = MakeGuard([&]() {
- CleanDirectory(ZenTempFolder, {});
- std::filesystem::remove(ZenTempFolder);
- });
+
CreateDirectories(Path / ZenTempBlockFolderName);
- CreateDirectories(Path / ZenTempChunkFolderName); // TODO: Don't clear this - pick up files -> chunks to use
- CreateDirectories(Path /
- ZenTempCacheFolderName); // TODO: Don't clear this - pick up files and use as sequences (non .tmp extension)
- // and delete .tmp (maybe?) - chunk them? How do we know the file is worth chunking?
+ CreateDirectories(Path / ZenTempCacheFolderName);
+ CreateDirectories(Path / ZenTempDownloadFolderName);
std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u;
@@ -5748,6 +6370,8 @@ namespace {
ZEN_CONSOLE("Downloaded build in {}.", NiceTimeSpanMs(DownloadTimer.GetElapsedTimeMs()));
}
}
+ CleanDirectory(ZenTempFolder, {});
+ std::filesystem::remove(ZenTempFolder);
}
void DiffFolders(const std::filesystem::path& BasePath, const std::filesystem::path& ComparePath, bool OnlyChunked)
@@ -6191,6 +6815,15 @@ BuildsCommand::BuildsCommand()
"<name>");
m_ValidateBuildPartOptions.parse_positional({"build-id", "build-part-id"});
m_ValidateBuildPartOptions.positional_help("build-id build-part-id");
+
+ AddCloudOptions(m_MultiTestDownloadOptions);
+ AddFileOptions(m_MultiTestDownloadOptions);
+ AddOutputOptions(m_MultiTestDownloadOptions);
+ m_MultiTestDownloadOptions
+ .add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>");
+ m_MultiTestDownloadOptions.add_option("", "", "build-ids", "Build Ids list separated by ','", cxxopts::value(m_BuildIds), "<ids>");
+ m_MultiTestDownloadOptions.parse_positional({"local-path"});
+ m_MultiTestDownloadOptions.positional_help("local-path");
}
BuildsCommand::~BuildsCommand() = default;
@@ -6667,6 +7300,79 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
return AbortFlag ? 11 : 0;
}
+ if (SubOption == &m_MultiTestDownloadOptions)
+ {
+ if (m_Path.empty())
+ {
+ throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help()));
+ }
+
+ ParseStorageOptions();
+ ParseAuthOptions();
+
+ HttpClient Http(m_BuildsUrl, ClientSettings);
+ // m_StoragePath = "D:\\buildstorage";
+ // m_Path = "F:\\Saved\\DownloadedBuilds\\++Fortnite+Main-CL-XXXXXXXX\\WindowsClient";
+ // std::vector<std::string> BuildIdStrings{"07d3942f0e7f4ca1b13b0587",
+ // "07d394eed89d769f2254e75d",
+ // "07d3953f22fa3f8000fa6f0a",
+ // "07d3959df47ed1f42ddbe44c",
+ // "07d395fa7803d50804f14417",
+ // "07d3964f919d577a321a1fdd",
+ // "07d396a6ce875004e16b9528"};
+
+ BuildStorage::Statistics StorageStats;
+ std::unique_ptr<BuildStorage> Storage;
+ std::string StorageName;
+ if (!m_BuildsUrl.empty())
+ {
+ ZEN_CONSOLE("Downloading {} to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}'",
+ FormatArray<std::string>(m_BuildIds, " "sv),
+ m_Path,
+ m_BuildsUrl,
+ Http.GetSessionId(),
+ m_Namespace,
+ m_Bucket);
+ Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
+ StorageName = "Cloud DDC";
+ }
+ else if (!m_StoragePath.empty())
+ {
+ ZEN_CONSOLE("Downloading {}'to '{}' from folder {}", FormatArray<std::string>(m_BuildIds, " "sv), m_Path, m_StoragePath);
+ Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
+ StorageName = fmt::format("Disk {}", m_StoragePath.stem());
+ }
+ else
+ {
+ throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
+ }
+
+ for (const std::string& BuildIdString : m_BuildIds)
+ {
+ Oid BuildId = Oid::FromHexString(BuildIdString);
+ if (BuildId == Oid::Zero)
+ {
+ throw zen::OptionParseException(fmt::format("invalid build id {}\n{}", BuildIdString, m_DownloadOptions.help()));
+ }
+ DownloadFolder(*Storage,
+ BuildId,
+ {},
+ {},
+ m_Path,
+ m_AllowMultiparts,
+ m_AllowPartialBlockRequests,
+ BuildIdString == m_BuildIds.front(),
+ true);
+ if (AbortFlag)
+ {
+ ZEN_CONSOLE("Download cancelled");
+ return 11;
+ }
+ ZEN_CONSOLE("\n");
+ }
+ return 0;
+ }
+
if (SubOption == &m_TestOptions)
{
ParseStorageOptions();
diff --git a/src/zen/cmds/builds_cmd.h b/src/zen/cmds/builds_cmd.h
index 838a17807..167a5d29f 100644
--- a/src/zen/cmds/builds_cmd.h
+++ b/src/zen/cmds/builds_cmd.h
@@ -92,18 +92,22 @@ private:
cxxopts::Options m_TestOptions{"test", "Test upload and download with verify"};
+ cxxopts::Options m_MultiTestDownloadOptions{"multi-test-download", "Test multiple sequenced downloads with verify"};
+ std::vector<std::string> m_BuildIds;
+
cxxopts::Options m_FetchBlobOptions{"fetch-blob", "Fetch a blob from remote store"};
std::string m_BlobHash;
cxxopts::Options m_ValidateBuildPartOptions{"validate-part", "Fetch a build part and validate all referenced attachments"};
- cxxopts::Options* m_SubCommands[7] = {&m_ListOptions,
+ cxxopts::Options* m_SubCommands[8] = {&m_ListOptions,
&m_UploadOptions,
&m_DownloadOptions,
&m_DiffOptions,
&m_TestOptions,
&m_FetchBlobOptions,
- &m_ValidateBuildPartOptions};
+ &m_ValidateBuildPartOptions,
+ &m_MultiTestDownloadOptions};
};
} // namespace zen
diff --git a/src/zencore/basicfile.cpp b/src/zencore/basicfile.cpp
index 6e879ca0d..95876cff4 100644
--- a/src/zencore/basicfile.cpp
+++ b/src/zencore/basicfile.cpp
@@ -858,7 +858,7 @@ BasicFileWriter::Flush()
}
IoBuffer
-WriteToTempFile(const CompositeBuffer& Buffer, const std::filesystem::path& Path)
+WriteToTempFile(CompositeBuffer&& Buffer, const std::filesystem::path& Path)
{
TemporaryFile Temp;
std::error_code Ec;
@@ -868,6 +868,7 @@ WriteToTempFile(const CompositeBuffer& Buffer, const std::filesystem::path& Path
throw std::system_error(Ec, fmt::format("Failed to create temp file for blob at '{}'", Path));
}
+ uint64_t BufferSize = Buffer.GetSize();
{
uint64_t Offset = 0;
static const uint64_t BufferingSize = 256u * 1024u;
@@ -899,21 +900,31 @@ WriteToTempFile(const CompositeBuffer& Buffer, const std::filesystem::path& Path
Temp.MoveTemporaryIntoPlace(Path, Ec);
if (Ec)
{
- IoBuffer TmpBuffer = IoBufferBuilder::MakeFromFile(Path);
- if (TmpBuffer)
+ Ec.clear();
+ BasicFile OpenTemp(Path, BasicFile::Mode::kDelete, Ec);
+ if (Ec)
{
- IoHash ExistingHash = IoHash::HashBuffer(TmpBuffer);
- const IoHash ExpectedHash = IoHash::HashBuffer(Buffer);
- if (ExistingHash == ExpectedHash)
- {
- TmpBuffer.SetDeleteOnClose(true);
- return TmpBuffer;
- }
+ throw std::system_error(Ec, fmt::format("Failed to move temp file to '{}'", Path));
}
- throw std::system_error(Ec, fmt::format("Failed to move temp file to '{}'", Path));
- }
+ if (OpenTemp.FileSize() != BufferSize)
+ {
+ throw std::runtime_error(fmt::format("Failed to move temp file to '{}' - mismatching file size already exists", Path));
+ }
+ IoBuffer TmpBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BufferSize, true);
- IoBuffer TmpBuffer = IoBufferBuilder::MakeFromFile(Path);
+ IoHash ExistingHash = IoHash::HashBuffer(TmpBuffer);
+ const IoHash ExpectedHash = IoHash::HashBuffer(Buffer);
+ if (ExistingHash != ExpectedHash)
+ {
+ throw std::runtime_error(fmt::format("Failed to move temp file to '{}' - mismatching file hash already exists", Path));
+ }
+ Buffer = CompositeBuffer{};
+ TmpBuffer.SetDeleteOnClose(true);
+ return TmpBuffer;
+ }
+ Buffer = CompositeBuffer{};
+ BasicFile OpenTemp(Path, BasicFile::Mode::kDelete);
+ IoBuffer TmpBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BufferSize, true);
TmpBuffer.SetDeleteOnClose(true);
return TmpBuffer;
}
diff --git a/src/zencore/compress.cpp b/src/zencore/compress.cpp
index 1844f6a63..88c3bb5b9 100644
--- a/src/zencore/compress.cpp
+++ b/src/zencore/compress.cpp
@@ -193,7 +193,7 @@ public:
const CompositeBuffer& CompressedData,
uint64_t RawOffset,
uint64_t RawSize,
- std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const = 0;
+ std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const = 0;
};
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -287,13 +287,16 @@ public:
const CompositeBuffer& CompressedData,
uint64_t RawOffset,
uint64_t RawSize,
- std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const final
+ std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const final
{
if (Header.Method == CompressionMethod::None && Header.TotalCompressedSize == CompressedData.GetSize() &&
Header.TotalCompressedSize == Header.TotalRawSize + sizeof(BufferHeader) && RawOffset < Header.TotalRawSize &&
(RawOffset + RawSize) <= Header.TotalRawSize)
{
- Callback(0, CompressedData.Mid(sizeof(BufferHeader) + RawOffset, RawSize));
+ if (!Callback(0, CompressedData.Mid(sizeof(BufferHeader) + RawOffset, RawSize)))
+ {
+ return false;
+ }
return true;
}
return false;
@@ -616,7 +619,7 @@ public:
const CompositeBuffer& CompressedData,
uint64_t RawOffset,
uint64_t RawSize,
- std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const final;
+ std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const final;
protected:
virtual bool DecompressBlock(MutableMemoryView RawData, MemoryView CompressedData) const = 0;
@@ -744,7 +747,7 @@ BlockDecoder::DecompressToStream(const BufferHeader& Header,
const CompositeBuffer& CompressedData,
uint64_t RawOffset,
uint64_t RawSize,
- std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const
+ std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const
{
if (Header.TotalCompressedSize != CompressedData.GetSize())
{
@@ -814,14 +817,23 @@ BlockDecoder::DecompressToStream(const BufferHeader& Header,
Source.Detach();
return false;
}
- Callback(BlockIndex * BlockSize + OffsetInFirstBlock,
- CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawDataBuffer.GetData(), BytesToUncompress)));
+ if (!Callback(BlockIndex * BlockSize + OffsetInFirstBlock,
+ CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawDataBuffer.GetData(), BytesToUncompress))))
+ {
+ Source.Detach();
+ return false;
+ }
}
else
{
- Callback(BlockIndex * BlockSize + OffsetInFirstBlock,
- CompositeBuffer(
- IoBuffer(IoBuffer::Wrap, CompressedBlockCopy.GetView().Mid(OffsetInFirstBlock).GetData(), BytesToUncompress)));
+ if (!Callback(
+ BlockIndex * BlockSize + OffsetInFirstBlock,
+ CompositeBuffer(
+ IoBuffer(IoBuffer::Wrap, CompressedBlockCopy.GetView().Mid(OffsetInFirstBlock).GetData(), BytesToUncompress))))
+ {
+ Source.Detach();
+ return false;
+ }
}
OffsetInFirstBlock = 0;
@@ -858,14 +870,21 @@ BlockDecoder::DecompressToStream(const BufferHeader& Header,
{
return false;
}
- Callback(BlockIndex * BlockSize + OffsetInFirstBlock,
- CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawDataBuffer.GetData(), BytesToUncompress)));
+ if (!Callback(BlockIndex * BlockSize + OffsetInFirstBlock,
+ CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawDataBuffer.GetData(), BytesToUncompress))))
+ {
+ return false;
+ }
}
else
{
- Callback(BlockIndex * BlockSize + OffsetInFirstBlock,
- CompositeBuffer(
- IoBuffer(IoBuffer::Wrap, CompressedBlockCopy.GetView().Mid(OffsetInFirstBlock).GetData(), BytesToUncompress)));
+ if (!Callback(
+ BlockIndex * BlockSize + OffsetInFirstBlock,
+ CompositeBuffer(
+ IoBuffer(IoBuffer::Wrap, CompressedBlockCopy.GetView().Mid(OffsetInFirstBlock).GetData(), BytesToUncompress))))
+ {
+ return false;
+ }
}
OffsetInFirstBlock = 0;
@@ -1978,7 +1997,7 @@ CompressedBuffer::DecompressToComposite() const
bool
CompressedBuffer::DecompressToStream(uint64_t RawOffset,
uint64_t RawSize,
- std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const
+ std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const
{
using namespace detail;
if (CompressedData)
diff --git a/src/zencore/include/zencore/basicfile.h b/src/zencore/include/zencore/basicfile.h
index a78132879..57798b6f4 100644
--- a/src/zencore/include/zencore/basicfile.h
+++ b/src/zencore/include/zencore/basicfile.h
@@ -186,7 +186,7 @@ private:
uint64_t m_BufferEnd;
};
-IoBuffer WriteToTempFile(const CompositeBuffer& Buffer, const std::filesystem::path& Path);
+IoBuffer WriteToTempFile(CompositeBuffer&& Buffer, const std::filesystem::path& Path);
ZENCORE_API void basicfile_forcelink();
diff --git a/src/zencore/include/zencore/compress.h b/src/zencore/include/zencore/compress.h
index 3969e9dbd..74fd5f767 100644
--- a/src/zencore/include/zencore/compress.h
+++ b/src/zencore/include/zencore/compress.h
@@ -209,7 +209,7 @@ public:
*/
[[nodiscard]] ZENCORE_API bool DecompressToStream(uint64_t RawOffset,
uint64_t RawSize,
- std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const;
+ std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const;
/** A null compressed buffer. */
static const CompressedBuffer Null;
diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp
index d15fb2e83..445fe939e 100644
--- a/src/zencore/workthreadpool.cpp
+++ b/src/zencore/workthreadpool.cpp
@@ -274,7 +274,7 @@ WorkerThreadPool::ScheduleWork(Ref<IWork> Work)
void
WorkerThreadPool::ScheduleWork(std::function<void()>&& Work)
{
- ScheduleWork(Ref<IWork>(new detail::LambdaWork(Work)));
+ ScheduleWork(Ref<IWork>(new detail::LambdaWork(std::move(Work))));
}
[[nodiscard]] size_t
diff --git a/src/zenutil/chunkblock.cpp b/src/zenutil/chunkblock.cpp
index a19cf5c1b..f3c14edc4 100644
--- a/src/zenutil/chunkblock.cpp
+++ b/src/zenutil/chunkblock.cpp
@@ -187,31 +187,54 @@ GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks,
return CompressedBlock;
}
-bool
-IterateChunkBlock(const SharedBuffer& BlockPayload,
- std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor,
- uint64_t& OutHeaderSize)
+std::vector<uint32_t>
+ReadChunkBlockHeader(const MemoryView BlockView, uint64_t& OutHeaderSize)
{
- ZEN_ASSERT(BlockPayload);
- if (BlockPayload.GetSize() < 1)
- {
- return false;
- }
-
- MemoryView BlockView = BlockPayload.GetView();
- const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData());
+ const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData());
uint32_t NumberSize;
uint64_t ChunkCount = ReadVarUInt(ReadPtr, NumberSize);
ReadPtr += NumberSize;
- std::vector<uint64_t> ChunkSizes;
+ std::vector<uint32_t> ChunkSizes;
ChunkSizes.reserve(ChunkCount);
while (ChunkCount--)
{
- ChunkSizes.push_back(ReadVarUInt(ReadPtr, NumberSize));
+ if (ReadPtr >= BlockView.GetDataEnd())
+ {
+ throw std::runtime_error("Invalid block header, block data ended unexpectedly");
+ }
+ uint64_t ChunkSize = ReadVarUInt(ReadPtr, NumberSize);
+ if (ChunkSize > std::numeric_limits<uint32_t>::max())
+ {
+ throw std::runtime_error("Invalid block header, header data is corrupt");
+ }
+ if (ChunkSize < 1)
+ {
+ throw std::runtime_error("Invalid block header, header data is corrupt");
+ }
+ ChunkSizes.push_back(gsl::narrow<uint32_t>(ChunkSize));
ReadPtr += NumberSize;
}
uint64_t Offset = std::distance((const uint8_t*)BlockView.GetData(), ReadPtr);
OutHeaderSize = Offset;
+ return ChunkSizes;
+}
+
+bool
+IterateChunkBlock(const SharedBuffer& BlockPayload,
+ std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor,
+ uint64_t& OutHeaderSize)
+{
+ ZEN_ASSERT(BlockPayload);
+ if (BlockPayload.GetSize() < 1)
+ {
+ return false;
+ }
+
+ MemoryView BlockView = BlockPayload.GetView();
+
+ std::vector<uint32_t> ChunkSizes = ReadChunkBlockHeader(BlockView, OutHeaderSize);
+ uint64_t Offset = OutHeaderSize;
+ OutHeaderSize = Offset;
for (uint64_t ChunkSize : ChunkSizes)
{
IoBuffer Chunk(BlockPayload.AsIoBuffer(), Offset, ChunkSize);
diff --git a/src/zenutil/include/zenutil/chunkblock.h b/src/zenutil/include/zenutil/chunkblock.h
index 21107fb7c..277580c74 100644
--- a/src/zenutil/include/zenutil/chunkblock.h
+++ b/src/zenutil/include/zenutil/chunkblock.h
@@ -31,9 +31,10 @@ CbObject BuildChunkBlockDescription(const ChunkBlockDescription& Block,
ChunkBlockDescription GetChunkBlockDescription(const SharedBuffer& BlockPayload, const IoHash& RawHash);
typedef std::function<std::pair<uint64_t, CompressedBuffer>(const IoHash& RawHash)> FetchChunkFunc;
-CompressedBuffer GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, ChunkBlockDescription& OutBlock);
-bool IterateChunkBlock(const SharedBuffer& BlockPayload,
- std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor,
- uint64_t& OutHeaderSize);
+CompressedBuffer GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, ChunkBlockDescription& OutBlock);
+bool IterateChunkBlock(const SharedBuffer& BlockPayload,
+ std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor,
+ uint64_t& OutHeaderSize);
+std::vector<uint32_t> ReadChunkBlockHeader(const MemoryView BlockView, uint64_t& OutHeaderSize);
} // namespace zen