diff options
| author | Dan Engelbrecht <[email protected]> | 2025-06-03 16:21:01 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-06-03 16:21:01 +0200 |
| commit | a0b10b046095d57ffbdb46c83084601a832f4562 (patch) | |
| tree | fe015645ea07d83c2784e3e28d0e976a37054859 /src/zen/cmds/builds_cmd.cpp | |
| parent | minor: fix unused variable warning on some compilers (diff) | |
| download | archived-zen-a0b10b046095d57ffbdb46c83084601a832f4562.tar.xz archived-zen-a0b10b046095d57ffbdb46c83084601a832f4562.zip | |
fixed size chunking for encrypted files (#410)
- Improvement: Use fixed size block chunking for know encrypted/compressed file types
- Improvement: Skip trying to compress chunks that are sourced from files that are known to be encrypted/compressed
- Improvement: Add global open file cache for written files increasing throughput during download by reducing overhead of open/close of file by 80%
Diffstat (limited to 'src/zen/cmds/builds_cmd.cpp')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 549 |
1 files changed, 286 insertions, 263 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index f6f15acb0..e13c90b4b 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -22,6 +22,7 @@ #include <zenhttp/httpclient.h> #include <zenhttp/httpclientauth.h> #include <zenhttp/httpcommon.h> +#include <zenutil/bufferedwritefilecache.h> #include <zenutil/buildstoragecache.h> #include <zenutil/chunkblock.h> #include <zenutil/chunkedcontent.h> @@ -105,6 +106,40 @@ namespace { } WorkerThreadPool& GetNetworkPool() { return SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst); } + static const std::vector<uint32_t> NonCompressableExtensions({HashStringDjb2(".mp4"sv), + HashStringDjb2(".zip"sv), + HashStringDjb2(".7z"sv), + HashStringDjb2(".bzip"sv), + HashStringDjb2(".rar"sv), + HashStringDjb2(".gzip"sv), + HashStringDjb2(".apk"sv), + HashStringDjb2(".nsp"sv), + HashStringDjb2(".xvc"sv), + HashStringDjb2(".pkg"sv), + HashStringDjb2(".dmg"sv), + HashStringDjb2(".ipa"sv)}); + + static const tsl::robin_set<uint32_t> NonCompressableExtensionSet(NonCompressableExtensions.begin(), NonCompressableExtensions.end()); + + static bool IsExtensionHashCompressable(const uint32_t PathHash) { return !NonCompressableExtensionSet.contains(PathHash); } + + static bool IsChunkCompressable(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, uint32_t ChunkIndex) + { + ZEN_UNUSED(Content); + const uint32_t ChunkLocationCount = Lookup.ChunkSequenceLocationCounts[ChunkIndex]; + if (ChunkLocationCount == 0) + { + return false; + } + const size_t ChunkLocationOffset = Lookup.ChunkSequenceLocationOffset[ChunkIndex]; + const uint32_t SequenceIndex = Lookup.ChunkSequenceLocations[ChunkLocationOffset].SequenceIndex; + const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex]; + const uint32_t ExtensionHash = Lookup.PathExtensionHash[PathIndex]; + + const bool IsCompressable = IsExtensionHashCompressable(ExtensionHash); + return IsCompressable; + } + const uint64_t MinimumSizeForCompressInBlock = 2u * 1024u; const std::string ZenFolderName = ".zen"; @@ -313,7 +348,10 @@ namespace { std::atomic<uint64_t>& WriteByteCount) { BasicFile TargetFile(TargetFilePath, BasicFile::Mode::kTruncate); - PrepareFileForScatteredWrite(TargetFile.Handle(), RawSize); + if (UseSparseFiles) + { + PrepareFileForScatteredWrite(TargetFile.Handle(), RawSize); + } uint64_t Offset = 0; if (!ScanFile(SourceFilePath, 512u * 1024u, [&](const void* Data, size_t Size) { TargetFile.Write(Data, Size, Offset); @@ -600,24 +638,6 @@ namespace { return AuthToken; } - bool IsBufferDiskBased(const IoBuffer& Buffer) - { - IoBufferFileReference FileRef; - if (Buffer.GetFileReference(FileRef)) - { - return true; - } - return false; - } - - bool IsBufferDiskBased(const CompositeBuffer& Buffer) - { - // If this is a file based buffer or a compressed buffer with a memory-based header, we don't need to rewrite to disk to save memory - std::span<const SharedBuffer> Segments = Buffer.GetSegments(); - ZEN_ASSERT(Buffer.GetSegments().size() > 0); - return IsBufferDiskBased(Segments.back().AsIoBuffer()); - } - IoBuffer WriteToTempFile(CompositeBuffer&& Buffer, const std::filesystem::path& TempFolderPath, const IoHash& Hash, @@ -1729,16 +1749,13 @@ namespace { { ZEN_ASSERT(false); } - uint64_t RawSize = Chunk.GetSize(); - if (Lookup.RawHashToSequenceIndex.contains(ChunkHash) && RawSize >= MinimumSizeForCompressInBlock) - { - // Standalone chunk, not part of a sequence - return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid)}; - } - else - { - return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, OodleCompressionLevel::None)}; - } + uint64_t RawSize = Chunk.GetSize(); + const bool ShouldCompressChunk = Lookup.RawHashToSequenceIndex.contains(ChunkHash) && + (RawSize >= MinimumSizeForCompressInBlock) && + IsChunkCompressable(Content, Lookup, ChunkIndex); + const OodleCompressionLevel CompressionLevel = + ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None; + return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, CompressionLevel)}; })); } @@ -1768,18 +1785,15 @@ namespace { Content.ChunkedContent.ChunkRawSizes[ChunkIndex]); ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash); - const uint64_t RawSize = Chunk.GetSize(); - if (Lookup.RawHashToSequenceIndex.contains(ChunkHash) && RawSize >= MinimumSizeForCompressInBlock) - { - CompositeBuffer CompressedChunk = CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid).GetCompressed(); - ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end()); - } - else - { - CompositeBuffer CompressedChunk = - CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, OodleCompressionLevel::None).GetCompressed(); - ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end()); - } + const uint64_t RawSize = Chunk.GetSize(); + const bool ShouldCompressChunk = Lookup.RawHashToSequenceIndex.contains(ChunkHash) && + (RawSize >= MinimumSizeForCompressInBlock) && IsChunkCompressable(Content, Lookup, ChunkIndex); + const OodleCompressionLevel CompressionLevel = + ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None; + + CompositeBuffer CompressedChunk = + CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, CompressionLevel).GetCompressed(); + ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end()); } return CompressedBuffer::FromCompressedNoValidate(CompositeBuffer(std::move(ResultBuffers))); }; @@ -2253,7 +2267,11 @@ namespace { { throw std::runtime_error(fmt::format("Fetched chunk {} has invalid size", ChunkHash)); } - ZEN_ASSERT_SLOW(IoHash::HashBuffer(RawSource) == ChunkHash); + + const bool ShouldCompressChunk = IsChunkCompressable(Content, Lookup, ChunkIndex); + const OodleCompressionLevel CompressionLevel = ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None; + + if (ShouldCompressChunk) { std::filesystem::path TempFilePath = TempFolderPath / ChunkHash.ToHexString(); @@ -2266,6 +2284,9 @@ namespace { fmt::format("Failed creating temporary file for compressing blob {}. Reason: {}", ChunkHash, Ec.message())); } + uint64_t StreamRawBytes = 0; + uint64_t StreamCompressedBytes = 0; + bool CouldCompress = CompressedBuffer::CompressToStream( CompositeBuffer(SharedBuffer(RawSource)), [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { @@ -2273,7 +2294,11 @@ namespace { LooseChunksStats.CompressedChunkRawBytes += SourceSize; CompressedFile.Write(RangeBuffer, Offset); LooseChunksStats.CompressedChunkBytes += RangeBuffer.GetSize(); - }); + StreamRawBytes += SourceSize; + StreamCompressedBytes += RangeBuffer.GetSize(); + }, + OodleCompressor::Mermaid, + CompressionLevel); if (CouldCompress) { uint64_t CompressedSize = CompressedFile.FileSize(); @@ -2296,22 +2321,36 @@ namespace { return Compressed.GetCompressed(); } + else + { + LooseChunksStats.CompressedChunkRawBytes -= StreamRawBytes; + LooseChunksStats.CompressedChunkBytes -= StreamCompressedBytes; + } CompressedFile.Close(); RemoveFile(TempFilePath, Ec); ZEN_UNUSED(Ec); } - // Try regular compress - decompress may fail if compressed data is larger than non-compressed - CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(RawSource))); + CompressedBuffer CompressedBlob = + CompressedBuffer::Compress(SharedBuffer(std::move(RawSource)), OodleCompressor::Mermaid, CompressionLevel); if (!CompressedBlob) { throw std::runtime_error(fmt::format("Failed to compress large blob {}", ChunkHash)); } - if (!IsBufferDiskBased(CompressedBlob.GetCompressed())) + ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawHash() == ChunkHash); + ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawSize() == ChunkSize); + + LooseChunksStats.CompressedChunkRawBytes += ChunkSize; + LooseChunksStats.CompressedChunkBytes += CompressedBlob.GetCompressedSize(); + + // If we use none-compression, the compressed blob references the data and has 64 kb in memory so we don't need to write it to disk + if (ShouldCompressChunk) { IoBuffer TempPayload = WriteToTempFile(std::move(CompressedBlob).GetCompressed(), TempFolderPath, ChunkHash); CompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload)); } + + LooseChunksStats.CompressedChunkCount++; return std::move(CompressedBlob).GetCompressed(); } @@ -3431,7 +3470,8 @@ namespace { LocalFolderScanStats.ElapsedWallTimeUS = ManifestParseTimer.GetElapsedTimeUs(); } - std::unique_ptr<ChunkingController> ChunkController = CreateBasicChunkingController(); + std::unique_ptr<ChunkingController> ChunkController = + CreateChunkingControllerWithFixedChunking(ChunkingControllerWithFixedChunkingSettings{}); { CbObjectWriter ChunkParametersWriter; ChunkParametersWriter.AddString("name"sv, ChunkController->GetName()); @@ -4399,106 +4439,6 @@ namespace { } } - class WriteFileCache - { - public: - WriteFileCache(DiskStatistics& DiskStats) : m_DiskStats(DiskStats) {} - ~WriteFileCache() { Flush(); } - - template<typename TBufferType> - void WriteToFile(uint32_t TargetIndex, - std::function<std::filesystem::path(uint32_t TargetIndex)>&& GetTargetPath, - const TBufferType& Buffer, - uint64_t FileOffset, - uint64_t TargetFinalSize) - { - ZEN_TRACE_CPU("WriteFileCache_WriteToFile"); - if (!SeenTargetIndexes.empty() && SeenTargetIndexes.back() == TargetIndex) - { - ZEN_TRACE_CPU("WriteFileCache_WriteToFile_CacheWrite"); - ZEN_ASSERT(OpenFileWriter); - OpenFileWriter->Write(Buffer, FileOffset); - m_DiskStats.WriteCount++; - m_DiskStats.WriteByteCount += Buffer.GetSize(); - } - else - { - std::unique_ptr<BasicFile> NewOutputFile; - { - ZEN_TRACE_CPU("WriteFileCache_WriteToFile_Open"); - Flush(); - const std::filesystem::path& TargetPath = GetTargetPath(TargetIndex); - CreateDirectories(TargetPath.parent_path()); - uint32_t Tries = 5; - NewOutputFile = - std::make_unique<BasicFile>(TargetPath, BasicFile::Mode::kWrite, [&Tries, TargetPath](std::error_code& Ec) { - if (Tries < 3) - { - ZEN_CONSOLE("Failed opening file '{}': {}{}", TargetPath, Ec.message(), Tries > 1 ? " Retrying"sv : ""sv); - } - if (Tries > 1) - { - Sleep(100); - } - return --Tries > 0; - }); - m_DiskStats.OpenWriteCount++; - m_DiskStats.CurrentOpenFileCount++; - } - - const bool CacheWriter = TargetFinalSize > Buffer.GetSize(); - if (CacheWriter) - { - ZEN_TRACE_CPU("WriteFileCache_WriteToFile_CacheWrite"); - ZEN_ASSERT_SLOW(std::find(SeenTargetIndexes.begin(), SeenTargetIndexes.end(), TargetIndex) == SeenTargetIndexes.end()); - - OutputFile = std::move(NewOutputFile); - if (UseSparseFiles) - { - void* Handle = OutputFile->Handle(); - if (!PrepareFileForScatteredWrite(Handle, TargetFinalSize)) - { - ZEN_DEBUG("Unable to to prepare file '{}' with size {} for random write", - GetTargetPath(TargetIndex), - TargetFinalSize); - } - } - OpenFileWriter = std::make_unique<BasicFileWriter>(*OutputFile, Min(TargetFinalSize, 256u * 1024u)); - - OpenFileWriter->Write(Buffer, FileOffset); - m_DiskStats.WriteCount++; - m_DiskStats.WriteByteCount += Buffer.GetSize(); - SeenTargetIndexes.push_back(TargetIndex); - } - else - { - ZEN_TRACE_CPU("WriteFileCache_WriteToFile_Write"); - NewOutputFile->Write(Buffer, FileOffset); - m_DiskStats.WriteCount++; - m_DiskStats.WriteByteCount += Buffer.GetSize(); - NewOutputFile = {}; - m_DiskStats.CurrentOpenFileCount--; - } - } - } - - void Flush() - { - ZEN_TRACE_CPU("WriteFileCache_Flush"); - if (OutputFile) - { - m_DiskStats.CurrentOpenFileCount--; - } - - OpenFileWriter = {}; - OutputFile = {}; - } - DiskStatistics& m_DiskStats; - std::vector<uint32_t> SeenTargetIndexes; - std::unique_ptr<BasicFile> OutputFile; - std::unique_ptr<BasicFileWriter> OpenFileWriter; - }; - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> GetRemainingChunkTargets( std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, const ChunkedContentLookup& Lookup, @@ -4581,7 +4521,7 @@ namespace { VerifySize, ExpectedSize)); } - ZEN_TRACE_CPU("HashSequence"); + const IoHash VerifyChunkHash = IoHash::HashBuffer(std::move(VerifyBuffer)); if (VerifyChunkHash != SequenceRawHash) { @@ -4651,6 +4591,77 @@ namespace { return CompletedSequenceIndexes; } + void WriteSequenceChunk(const std::filesystem::path& TargetFolderPath, + const ChunkedFolderContent& RemoteContent, + BufferedWriteFileCache::Local& LocalWriter, + const CompositeBuffer& Chunk, + const uint32_t SequenceIndex, + const uint64_t FileOffset, + const uint32_t PathIndex, + DiskStatistics& DiskStats) + { + ZEN_TRACE_CPU("WriteSequenceChunk"); + + const uint64_t SequenceSize = RemoteContent.RawSizes[PathIndex]; + + auto OpenFile = [&](BasicFile& File) { + const std::filesystem::path FileName = + GetTempChunkedSequenceFileName(TargetFolderPath, RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); + File.Open(FileName, BasicFile::Mode::kWrite); + if (UseSparseFiles) + { + PrepareFileForScatteredWrite(File.Handle(), SequenceSize); + } + }; + + const uint64_t ChunkSize = Chunk.GetSize(); + ZEN_ASSERT(FileOffset + ChunkSize <= SequenceSize); + if (ChunkSize == SequenceSize) + { + BasicFile SingleChunkFile; + OpenFile(SingleChunkFile); + + DiskStats.CurrentOpenFileCount++; + auto _ = MakeGuard([&DiskStats]() { DiskStats.CurrentOpenFileCount--; }); + SingleChunkFile.Write(Chunk, FileOffset); + + DiskStats.WriteCount++; + DiskStats.WriteByteCount += ChunkSize; + } + else + { + const uint64_t MaxWriterBufferSize = 256u * 1025u; + + BufferedWriteFileCache::Local::Writer* Writer = LocalWriter.GetWriter(SequenceIndex); + if (Writer) + { + if ((!Writer->Writer) && (ChunkSize < MaxWriterBufferSize)) + { + Writer->Writer = std::make_unique<BasicFileWriter>(*Writer->File, Min(SequenceSize, MaxWriterBufferSize)); + } + Writer->Write(Chunk, FileOffset); + + DiskStats.WriteCount++; + DiskStats.WriteByteCount += ChunkSize; + } + else + { + Writer = LocalWriter.PutWriter(SequenceIndex, std::make_unique<BufferedWriteFileCache::Local::Writer>()); + + Writer->File = std::make_unique<BasicFile>(); + OpenFile(*Writer->File); + if (ChunkSize < MaxWriterBufferSize) + { + Writer->Writer = std::make_unique<BasicFileWriter>(*Writer->File, Min(SequenceSize, MaxWriterBufferSize)); + } + Writer->Write(Chunk, FileOffset); + + DiskStats.WriteCount++; + DiskStats.WriteByteCount += ChunkSize; + } + } + } + struct BlockWriteOps { std::vector<CompositeBuffer> ChunkBuffers; @@ -4667,13 +4678,15 @@ namespace { const ChunkedContentLookup& Lookup, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, const BlockWriteOps& Ops, + BufferedWriteFileCache& WriteCache, ParallelWork& Work, WorkerThreadPool& VerifyPool, DiskStatistics& DiskStats) { ZEN_TRACE_CPU("WriteBlockChunkOps"); + { - WriteFileCache OpenFileCache(DiskStats); + BufferedWriteFileCache::Local LocalWriter(WriteCache); for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) { if (AbortFlag) @@ -4685,25 +4698,15 @@ namespace { ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <= RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]); ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0); - const uint64_t ChunkSize = Chunk.GetSize(); const uint64_t FileOffset = WriteOp.Target->Offset; const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex]; - ZEN_ASSERT(FileOffset + ChunkSize <= RemoteContent.RawSizes[PathIndex]); - OpenFileCache.WriteToFile<CompositeBuffer>( - SequenceIndex, - [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) { - return GetTempChunkedSequenceFileName(CacheFolderPath, - RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); - }, - Chunk, - FileOffset, - RemoteContent.RawSizes[PathIndex]); + WriteSequenceChunk(CacheFolderPath, RemoteContent, LocalWriter, Chunk, SequenceIndex, FileOffset, PathIndex, DiskStats); } } if (!AbortFlag) { - // Write tracking, updating this must be done without any files open (WriteFileCache) + // Write tracking, updating this must be done without any files open (BufferedWriteFileCache::Local) std::vector<uint32_t> CompletedChunkSequences; for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) { @@ -4713,6 +4716,7 @@ namespace { CompletedChunkSequences.push_back(RemoteSequenceIndex); } } + WriteCache.Close(CompletedChunkSequences); VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, Lookup, CompletedChunkSequences, Work, VerifyPool); } } @@ -4864,6 +4868,7 @@ namespace { CompositeBuffer&& BlockBuffer, const ChunkedContentLookup& Lookup, std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, + BufferedWriteFileCache& WriteCache, DiskStatistics& DiskStats) { ZEN_TRACE_CPU("WriteBlockToDisk"); @@ -4896,6 +4901,7 @@ namespace { Lookup, SequenceIndexChunksLeftToWriteCounters, Ops, + WriteCache, Work, VerifyPool, DiskStats); @@ -4920,6 +4926,7 @@ namespace { Lookup, SequenceIndexChunksLeftToWriteCounters, Ops, + WriteCache, Work, VerifyPool, DiskStats); @@ -4939,6 +4946,7 @@ namespace { uint32_t LastIncludedBlockChunkIndex, const ChunkedContentLookup& Lookup, std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, + BufferedWriteFileCache& WriteCache, DiskStatistics& DiskStats) { ZEN_TRACE_CPU("WritePartialBlockToDisk"); @@ -4963,6 +4971,7 @@ namespace { Lookup, SequenceIndexChunksLeftToWriteCounters, Ops, + WriteCache, Work, VerifyPool, DiskStats); @@ -4974,75 +4983,15 @@ namespace { } } - SharedBuffer Decompress(CompositeBuffer&& CompressedChunk, const IoHash& ChunkHash, const uint64_t ChunkRawSize) - { - ZEN_TRACE_CPU("Decompress"); - - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedChunk, RawHash, RawSize); - if (!Compressed) - { - throw std::runtime_error(fmt::format("Invalid build blob format for chunk {}", ChunkHash)); - } - if (RawHash != ChunkHash) - { - throw std::runtime_error(fmt::format("Mismatching build blob {}, but compressed header rawhash is {}", ChunkHash, RawHash)); - } - if (RawSize != ChunkRawSize) - { - throw std::runtime_error( - fmt::format("Mismatching build blob {}, expected raw size {} but recevied raw size {}", ChunkHash, ChunkRawSize, RawSize)); - } - if (!Compressed) - { - throw std::runtime_error(fmt::format("Invalid build blob {}, not a compressed buffer", ChunkHash)); - } - - SharedBuffer Decompressed = Compressed.Decompress(); - - if (!Decompressed) - { - throw std::runtime_error(fmt::format("Decompression of build blob {} failed", ChunkHash)); - } - return Decompressed; - } - - void WriteChunkToDisk(const std::filesystem::path& CacheFolderPath, - const ChunkedFolderContent& Content, - const ChunkedContentLookup& Lookup, - std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> ChunkTargets, - CompositeBuffer&& ChunkData, - WriteFileCache& OpenFileCache) - { - ZEN_TRACE_CPU("WriteChunkToDisk"); - - for (const ChunkedContentLookup::ChunkSequenceLocation* TargetPtr : ChunkTargets) - { - const auto& Target = *TargetPtr; - const uint64_t FileOffset = Target.Offset; - const uint32_t SequenceIndex = Target.SequenceIndex; - const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex]; - - OpenFileCache.WriteToFile( - SequenceIndex, - [&CacheFolderPath, &Content](uint32_t SequenceIndex) { - return GetTempChunkedSequenceFileName(CacheFolderPath, Content.ChunkedContent.SequenceRawHashes[SequenceIndex]); - }, - ChunkData, - FileOffset, - Content.RawSizes[PathIndex]); - } - } - - bool CanDecompressDirectToSequence(const ChunkedFolderContent& RemoteContent, - const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations) + bool IsSingleFileChunk(const ChunkedFolderContent& RemoteContent, + const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations) { if (Locations.size() == 1) { const uint32_t FirstSequenceIndex = Locations[0]->SequenceIndex; - if (Locations[0]->Offset == 0 && RemoteContent.ChunkedContent.ChunkCounts[FirstSequenceIndex] == 1) + if (RemoteContent.ChunkedContent.ChunkCounts[FirstSequenceIndex] == 1) { + ZEN_ASSERT_SLOW(Locations[0]->Offset == 0); return true; } } @@ -5087,13 +5036,14 @@ namespace { DiskStats.ReadByteCount += SourceSize; if (!AbortFlag) { - DecompressedTemp.Write(RangeBuffer, Offset); for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) { Hash.Append(Segment.GetView()); + DecompressedTemp.Write(Segment, Offset); + Offset += Segment.GetSize(); + DiskStats.WriteByteCount += Segment.GetSize(); + DiskStats.WriteCount++; } - DiskStats.WriteByteCount += RangeBuffer.GetSize(); - DiskStats.WriteCount++; return true; } return false; @@ -5128,37 +5078,80 @@ namespace { const ChunkedContentLookup& RemoteLookup, const IoHash& ChunkHash, const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, + BufferedWriteFileCache& WriteCache, IoBuffer&& CompressedPart, DiskStatistics& DiskStats) { ZEN_TRACE_CPU("WriteCompressedChunk"); auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end()); - const uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second; - const uint64_t ChunkRawSize = RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; - if (CanDecompressDirectToSequence(RemoteContent, ChunkTargetPtrs)) + if (IsSingleFileChunk(RemoteContent, ChunkTargetPtrs)) { const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex; const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]; StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), DiskStats); + return false; } else { - SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)), ChunkHash, ChunkRawSize); + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompositeBuffer(std::move(CompressedPart)), RawHash, RawSize); + if (!Compressed) + { + throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", ChunkHash)); + } + if (RawHash != ChunkHash) + { + throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, ChunkHash)); + } - if (!AbortFlag) + BufferedWriteFileCache::Local LocalWriter(WriteCache); + + IoHashStream Hash; + bool CouldDecompress = Compressed.DecompressToStream( + 0, + (uint64_t)-1, + [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { + ZEN_UNUSED(SourceOffset); + ZEN_TRACE_CPU("StreamDecompress_Write"); + DiskStats.ReadByteCount += SourceSize; + if (!AbortFlag) + { + for (const ChunkedContentLookup::ChunkSequenceLocation* TargetPtr : ChunkTargetPtrs) + { + const auto& Target = *TargetPtr; + const uint64_t FileOffset = Target.Offset + Offset; + const uint32_t SequenceIndex = Target.SequenceIndex; + const uint32_t PathIndex = RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; + + WriteSequenceChunk(TargetFolder, + RemoteContent, + LocalWriter, + RangeBuffer, + SequenceIndex, + FileOffset, + PathIndex, + DiskStats); + } + + return true; + } + return false; + }); + + if (AbortFlag) { - WriteFileCache OpenFileCache(DiskStats); - WriteChunkToDisk(TargetFolder, - RemoteContent, - RemoteLookup, - ChunkTargetPtrs, - CompositeBuffer(std::move(Chunk)), - OpenFileCache); - return true; + return false; + } + + if (!CouldDecompress) + { + throw std::runtime_error(fmt::format("Failed to decompress large chunk {}", ChunkHash)); } + + return true; } - return false; } void AsyncWriteDownloadedChunk(const std::filesystem::path& ZenFolderPath, @@ -5166,6 +5159,7 @@ namespace { const ChunkedContentLookup& RemoteLookup, uint32_t RemoteChunkIndex, std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs, + BufferedWriteFileCache& WriteCache, ParallelWork& Work, WorkerThreadPool& WritePool, IoBuffer&& Payload, @@ -5230,6 +5224,7 @@ namespace { CompressedChunkPath, RemoteChunkIndex, TotalPartWriteCount, + &WriteCache, &DiskStats, &WritePartsComplete, &FilteredWrittenBytesPerSecond, @@ -5264,6 +5259,7 @@ namespace { RemoteLookup, ChunkHash, ChunkTargetPtrs, + WriteCache, std::move(CompressedPart), DiskStats); if (!AbortFlag) @@ -5281,6 +5277,7 @@ namespace { std::vector<uint32_t> CompletedSequences = CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); + WriteCache.Close(CompletedSequences); if (NeedHashVerify) { VerifyAndCompleteChunkSequencesAsync(TargetFolder, @@ -6387,6 +6384,8 @@ namespace { } } + BufferedWriteFileCache WriteCache; + for (uint32_t ScavengeOpIndex = 0; ScavengeOpIndex < ScavengeCopyOperations.size(); ScavengeOpIndex++) { if (AbortFlag) @@ -6487,6 +6486,7 @@ namespace { PreferredMultipartChunkSize, TotalRequestCount, TotalPartWriteCount, + &WriteCache, &FilteredDownloadedBytesPerSecond, &FilteredWrittenBytesPerSecond](std::atomic<bool>&) mutable { if (!AbortFlag) @@ -6535,6 +6535,7 @@ namespace { &RemoteLookup, &CacheFolderPath, &SequenceIndexChunksLeftToWriteCounters, + &WriteCache, &Work, &WritePool, &DiskStats, @@ -6568,6 +6569,7 @@ namespace { RemoteLookup, ChunkHash, ChunkTargetPtrs, + WriteCache, std::move(CompressedPart), DiskStats); WritePartsComplete++; @@ -6583,6 +6585,7 @@ namespace { std::vector<uint32_t> CompletedSequences = CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); + WriteCache.Close(CompletedSequences); if (NeedHashVerify) { VerifyAndCompleteChunkSequencesAsync(TargetFolder, @@ -6608,11 +6611,12 @@ namespace { &ZenFolderPath, &Storage, BuildId = Oid(BuildId), - &PrimeCacheOnly, + PrimeCacheOnly, &RemoteContent, &RemoteLookup, &ExistsResult, &SequenceIndexChunksLeftToWriteCounters, + &WriteCache, &Work, &WritePool, &NetworkPool, @@ -6654,6 +6658,7 @@ namespace { RemoteLookup, RemoteChunkIndex, std::move(ChunkTargetPtrs), + WriteCache, Work, WritePool, std::move(BuildBlob), @@ -6683,6 +6688,7 @@ namespace { BuildId, PrimeCacheOnly, &SequenceIndexChunksLeftToWriteCounters, + &WriteCache, &Work, &WritePool, ChunkHash, @@ -6718,6 +6724,7 @@ namespace { RemoteLookup, RemoteChunkIndex, std::move(ChunkTargetPtrs), + WriteCache, Work, WritePool, std::move(Payload), @@ -6763,6 +6770,7 @@ namespace { RemoteLookup, RemoteChunkIndex, std::move(ChunkTargetPtrs), + WriteCache, Work, WritePool, std::move(BuildBlob), @@ -6800,6 +6808,7 @@ namespace { &CacheFolderPath, &LocalLookup, &SequenceIndexChunksLeftToWriteCounters, + &WriteCache, &Work, &WritePool, &FilteredWrittenBytesPerSecond, @@ -6892,8 +6901,9 @@ namespace { tsl::robin_set<uint32_t> ChunkIndexesWritten; - BufferedOpenFile SourceFile(SourceFilePath, DiskStats); - WriteFileCache OpenFileCache(DiskStats); + BufferedOpenFile SourceFile(SourceFilePath, DiskStats); + BufferedWriteFileCache::Local LocalWriter(WriteCache); + for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();) { if (AbortFlag) @@ -6940,18 +6950,17 @@ namespace { } CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ReadLength); - ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]); - - OpenFileCache.WriteToFile<CompositeBuffer>( - RemoteSequenceIndex, - [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) { - return GetTempChunkedSequenceFileName( - CacheFolderPath, - RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); - }, - ChunkSource, - Op.Target->Offset, - RemoteContent.RawSizes[RemotePathIndex]); + + const uint64_t FileOffset = Op.Target->Offset; + + WriteSequenceChunk(CacheFolderPath, + RemoteContent, + LocalWriter, + ChunkSource, + RemoteSequenceIndex, + FileOffset, + RemotePathIndex, + DiskStats); CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes? @@ -6960,7 +6969,7 @@ namespace { } if (!AbortFlag) { - // Write tracking, updating this must be done without any files open (WriteFileCache) + // Write tracking, updating this must be done without any files open (BufferedWriteFileCache::Local) std::vector<uint32_t> CompletedChunkSequences; for (const WriteOp& Op : WriteOps) { @@ -6970,6 +6979,7 @@ namespace { CompletedChunkSequences.push_back(RemoteSequenceIndex); } } + WriteCache.Close(CompletedChunkSequences); VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, RemoteLookup, @@ -7003,6 +7013,7 @@ namespace { &RemoteLookup, &RemoteChunkIndexNeedsCopyFromSourceFlags, &SequenceIndexChunksLeftToWriteCounters, + &WriteCache, &Work, &WritePool, &BlockDescriptions, @@ -7038,6 +7049,7 @@ namespace { CompositeBuffer(std::move(BlockBuffer)), RemoteLookup, RemoteChunkIndexNeedsCopyFromSourceFlags, + WriteCache, DiskStats)) { std::error_code DummyEc; @@ -7080,6 +7092,7 @@ namespace { &RemoteContent, &SequenceIndexChunksLeftToWriteCounters, &ExistsResult, + &WriteCache, &FilteredDownloadedBytesPerSecond, TotalRequestCount, &WritePartsComplete, @@ -7185,6 +7198,7 @@ namespace { &RemoteChunkIndexNeedsCopyFromSourceFlags, &SequenceIndexChunksLeftToWriteCounters, &WritePartsComplete, + &WriteCache, &Work, TotalPartWriteCount, &WritePool, @@ -7230,6 +7244,7 @@ namespace { BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, RemoteLookup, RemoteChunkIndexNeedsCopyFromSourceFlags, + WriteCache, DiskStats)) { std::error_code DummyEc; @@ -7284,6 +7299,7 @@ namespace { &WritePool, &RemoteContent, &RemoteLookup, + &WriteCache, &CacheFolderPath, &RemoteChunkIndexNeedsCopyFromSourceFlags, &SequenceIndexChunksLeftToWriteCounters, @@ -7388,6 +7404,7 @@ namespace { &SequenceIndexChunksLeftToWriteCounters, BlockIndex, &BlockDescriptions, + &WriteCache, &WriteChunkStats, &DiskStats, &WritePartsComplete, @@ -7429,6 +7446,7 @@ namespace { CompositeBuffer(std::move(BlockBuffer)), RemoteLookup, RemoteChunkIndexNeedsCopyFromSourceFlags, + WriteCache, DiskStats)) { std::error_code DummyEc; @@ -7753,6 +7771,7 @@ namespace { Work.ScheduleWork(WritePool, [&Path, &LocalContent, &DeletedCount, LocalPathIndex](std::atomic<bool>&) { if (!AbortFlag) { + ZEN_TRACE_CPU("FinalizeTree_Remove"); const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); SetFileReadOnlyWithRetry(LocalFilePath, false); RemoveFileWithRetry(LocalFilePath); @@ -8815,7 +8834,7 @@ namespace { if (!ChunkController) { ZEN_CONSOLE("Warning: Unspecified chunking algorith, using default"); - ChunkController = CreateBasicChunkingController(); + ChunkController = CreateChunkingControllerWithFixedChunking(ChunkingControllerWithFixedChunkingSettings{}); } LocalContent = GetLocalContent(LocalFolderScanStats, @@ -8899,7 +8918,10 @@ namespace { { BuildPartString.Append(fmt::format(" {} ({})", BuildPart.second, BuildPart.first)); } - ZEN_CONSOLE("Downloading build {}, parts:{} to '{}'", BuildId, BuildPartString.ToView(), Path); + + uint64_t RawSize = std::accumulate(RemoteContent.RawSizes.begin(), RemoteContent.RawSizes.end(), std::uint64_t(0)); + + ZEN_CONSOLE("Downloading build {}, parts:{} to '{}' ({})", BuildId, BuildPartString.ToView(), Path, NiceBytes(RawSize)); FolderContent LocalFolderState; DiskStatistics DiskStats; @@ -9037,8 +9059,9 @@ namespace { ChunkedFolderContent CompareFolderContent; { - std::unique_ptr<ChunkingController> ChunkController = CreateBasicChunkingController(); - std::vector<std::string_view> ExcludeExtensions = DefaultExcludeExtensions; + std::unique_ptr<ChunkingController> ChunkController = + CreateChunkingControllerWithFixedChunking(ChunkingControllerWithFixedChunkingSettings{}); + std::vector<std::string_view> ExcludeExtensions = DefaultExcludeExtensions; if (OnlyChunked) { ExcludeExtensions.insert(ExcludeExtensions.end(), @@ -10717,7 +10740,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { ExtendableStringBuilder<256> SB; CompactBinaryToJson(MetaData, SB); - ZEN_CONSOLE("Upload Build {}, Part {} ({})\n{}", m_BuildId, BuildPartId, m_BuildPartName, SB.ToView()); + ZEN_CONSOLE("Upload Build {}, Part {} ({}) from '{}'\n{}", m_BuildId, BuildPartId, m_BuildPartName, m_Path, SB.ToView()); } const std::filesystem::path UploadTempDir = UploadTempDirectory(m_Path); |