diff options
| author | Dan Engelbrecht <[email protected]> | 2025-06-03 16:21:01 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-06-03 16:21:01 +0200 |
| commit | a0b10b046095d57ffbdb46c83084601a832f4562 (patch) | |
| tree | fe015645ea07d83c2784e3e28d0e976a37054859 /src | |
| parent | minor: fix unused variable warning on some compilers (diff) | |
| download | zen-a0b10b046095d57ffbdb46c83084601a832f4562.tar.xz zen-a0b10b046095d57ffbdb46c83084601a832f4562.zip | |
fixed size chunking for encrypted files (#410)
- Improvement: Use fixed size block chunking for know encrypted/compressed file types
- Improvement: Skip trying to compress chunks that are sourced from files that are known to be encrypted/compressed
- Improvement: Add global open file cache for written files increasing throughput during download by reducing overhead of open/close of file by 80%
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 549 | ||||
| -rw-r--r-- | src/zencore/basicfile.cpp | 10 | ||||
| -rw-r--r-- | src/zencore/blake3.cpp | 22 | ||||
| -rw-r--r-- | src/zencore/compress.cpp | 50 | ||||
| -rw-r--r-- | src/zencore/filesystem.cpp | 23 | ||||
| -rw-r--r-- | src/zencore/include/zencore/blake3.h | 1 | ||||
| -rw-r--r-- | src/zencore/include/zencore/iohash.h | 6 | ||||
| -rw-r--r-- | src/zenutil/bufferedwritefilecache.cpp | 177 | ||||
| -rw-r--r-- | src/zenutil/chunkedcontent.cpp | 4 | ||||
| -rw-r--r-- | src/zenutil/chunkingcontroller.cpp | 289 | ||||
| -rw-r--r-- | src/zenutil/filebuildstorage.cpp | 19 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/bufferedwritefilecache.h | 106 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/chunkedcontent.h | 1 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/chunkingcontroller.h | 45 |
14 files changed, 873 insertions, 429 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index f6f15acb0..e13c90b4b 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -22,6 +22,7 @@ #include <zenhttp/httpclient.h> #include <zenhttp/httpclientauth.h> #include <zenhttp/httpcommon.h> +#include <zenutil/bufferedwritefilecache.h> #include <zenutil/buildstoragecache.h> #include <zenutil/chunkblock.h> #include <zenutil/chunkedcontent.h> @@ -105,6 +106,40 @@ namespace { } WorkerThreadPool& GetNetworkPool() { return SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst); } + static const std::vector<uint32_t> NonCompressableExtensions({HashStringDjb2(".mp4"sv), + HashStringDjb2(".zip"sv), + HashStringDjb2(".7z"sv), + HashStringDjb2(".bzip"sv), + HashStringDjb2(".rar"sv), + HashStringDjb2(".gzip"sv), + HashStringDjb2(".apk"sv), + HashStringDjb2(".nsp"sv), + HashStringDjb2(".xvc"sv), + HashStringDjb2(".pkg"sv), + HashStringDjb2(".dmg"sv), + HashStringDjb2(".ipa"sv)}); + + static const tsl::robin_set<uint32_t> NonCompressableExtensionSet(NonCompressableExtensions.begin(), NonCompressableExtensions.end()); + + static bool IsExtensionHashCompressable(const uint32_t PathHash) { return !NonCompressableExtensionSet.contains(PathHash); } + + static bool IsChunkCompressable(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, uint32_t ChunkIndex) + { + ZEN_UNUSED(Content); + const uint32_t ChunkLocationCount = Lookup.ChunkSequenceLocationCounts[ChunkIndex]; + if (ChunkLocationCount == 0) + { + return false; + } + const size_t ChunkLocationOffset = Lookup.ChunkSequenceLocationOffset[ChunkIndex]; + const uint32_t SequenceIndex = Lookup.ChunkSequenceLocations[ChunkLocationOffset].SequenceIndex; + const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex]; + const uint32_t ExtensionHash = Lookup.PathExtensionHash[PathIndex]; + + const bool IsCompressable = IsExtensionHashCompressable(ExtensionHash); + return IsCompressable; + } + const uint64_t MinimumSizeForCompressInBlock = 2u * 1024u; const std::string ZenFolderName = ".zen"; @@ -313,7 +348,10 @@ namespace { std::atomic<uint64_t>& WriteByteCount) { BasicFile TargetFile(TargetFilePath, BasicFile::Mode::kTruncate); - PrepareFileForScatteredWrite(TargetFile.Handle(), RawSize); + if (UseSparseFiles) + { + PrepareFileForScatteredWrite(TargetFile.Handle(), RawSize); + } uint64_t Offset = 0; if (!ScanFile(SourceFilePath, 512u * 1024u, [&](const void* Data, size_t Size) { TargetFile.Write(Data, Size, Offset); @@ -600,24 +638,6 @@ namespace { return AuthToken; } - bool IsBufferDiskBased(const IoBuffer& Buffer) - { - IoBufferFileReference FileRef; - if (Buffer.GetFileReference(FileRef)) - { - return true; - } - return false; - } - - bool IsBufferDiskBased(const CompositeBuffer& Buffer) - { - // If this is a file based buffer or a compressed buffer with a memory-based header, we don't need to rewrite to disk to save memory - std::span<const SharedBuffer> Segments = Buffer.GetSegments(); - ZEN_ASSERT(Buffer.GetSegments().size() > 0); - return IsBufferDiskBased(Segments.back().AsIoBuffer()); - } - IoBuffer WriteToTempFile(CompositeBuffer&& Buffer, const std::filesystem::path& TempFolderPath, const IoHash& Hash, @@ -1729,16 +1749,13 @@ namespace { { ZEN_ASSERT(false); } - uint64_t RawSize = Chunk.GetSize(); - if (Lookup.RawHashToSequenceIndex.contains(ChunkHash) && RawSize >= MinimumSizeForCompressInBlock) - { - // Standalone chunk, not part of a sequence - return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid)}; - } - else - { - return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, OodleCompressionLevel::None)}; - } + uint64_t RawSize = Chunk.GetSize(); + const bool ShouldCompressChunk = Lookup.RawHashToSequenceIndex.contains(ChunkHash) && + (RawSize >= MinimumSizeForCompressInBlock) && + IsChunkCompressable(Content, Lookup, ChunkIndex); + const OodleCompressionLevel CompressionLevel = + ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None; + return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, CompressionLevel)}; })); } @@ -1768,18 +1785,15 @@ namespace { Content.ChunkedContent.ChunkRawSizes[ChunkIndex]); ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash); - const uint64_t RawSize = Chunk.GetSize(); - if (Lookup.RawHashToSequenceIndex.contains(ChunkHash) && RawSize >= MinimumSizeForCompressInBlock) - { - CompositeBuffer CompressedChunk = CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid).GetCompressed(); - ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end()); - } - else - { - CompositeBuffer CompressedChunk = - CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, OodleCompressionLevel::None).GetCompressed(); - ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end()); - } + const uint64_t RawSize = Chunk.GetSize(); + const bool ShouldCompressChunk = Lookup.RawHashToSequenceIndex.contains(ChunkHash) && + (RawSize >= MinimumSizeForCompressInBlock) && IsChunkCompressable(Content, Lookup, ChunkIndex); + const OodleCompressionLevel CompressionLevel = + ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None; + + CompositeBuffer CompressedChunk = + CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, CompressionLevel).GetCompressed(); + ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end()); } return CompressedBuffer::FromCompressedNoValidate(CompositeBuffer(std::move(ResultBuffers))); }; @@ -2253,7 +2267,11 @@ namespace { { throw std::runtime_error(fmt::format("Fetched chunk {} has invalid size", ChunkHash)); } - ZEN_ASSERT_SLOW(IoHash::HashBuffer(RawSource) == ChunkHash); + + const bool ShouldCompressChunk = IsChunkCompressable(Content, Lookup, ChunkIndex); + const OodleCompressionLevel CompressionLevel = ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None; + + if (ShouldCompressChunk) { std::filesystem::path TempFilePath = TempFolderPath / ChunkHash.ToHexString(); @@ -2266,6 +2284,9 @@ namespace { fmt::format("Failed creating temporary file for compressing blob {}. Reason: {}", ChunkHash, Ec.message())); } + uint64_t StreamRawBytes = 0; + uint64_t StreamCompressedBytes = 0; + bool CouldCompress = CompressedBuffer::CompressToStream( CompositeBuffer(SharedBuffer(RawSource)), [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { @@ -2273,7 +2294,11 @@ namespace { LooseChunksStats.CompressedChunkRawBytes += SourceSize; CompressedFile.Write(RangeBuffer, Offset); LooseChunksStats.CompressedChunkBytes += RangeBuffer.GetSize(); - }); + StreamRawBytes += SourceSize; + StreamCompressedBytes += RangeBuffer.GetSize(); + }, + OodleCompressor::Mermaid, + CompressionLevel); if (CouldCompress) { uint64_t CompressedSize = CompressedFile.FileSize(); @@ -2296,22 +2321,36 @@ namespace { return Compressed.GetCompressed(); } + else + { + LooseChunksStats.CompressedChunkRawBytes -= StreamRawBytes; + LooseChunksStats.CompressedChunkBytes -= StreamCompressedBytes; + } CompressedFile.Close(); RemoveFile(TempFilePath, Ec); ZEN_UNUSED(Ec); } - // Try regular compress - decompress may fail if compressed data is larger than non-compressed - CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(RawSource))); + CompressedBuffer CompressedBlob = + CompressedBuffer::Compress(SharedBuffer(std::move(RawSource)), OodleCompressor::Mermaid, CompressionLevel); if (!CompressedBlob) { throw std::runtime_error(fmt::format("Failed to compress large blob {}", ChunkHash)); } - if (!IsBufferDiskBased(CompressedBlob.GetCompressed())) + ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawHash() == ChunkHash); + ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawSize() == ChunkSize); + + LooseChunksStats.CompressedChunkRawBytes += ChunkSize; + LooseChunksStats.CompressedChunkBytes += CompressedBlob.GetCompressedSize(); + + // If we use none-compression, the compressed blob references the data and has 64 kb in memory so we don't need to write it to disk + if (ShouldCompressChunk) { IoBuffer TempPayload = WriteToTempFile(std::move(CompressedBlob).GetCompressed(), TempFolderPath, ChunkHash); CompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload)); } + + LooseChunksStats.CompressedChunkCount++; return std::move(CompressedBlob).GetCompressed(); } @@ -3431,7 +3470,8 @@ namespace { LocalFolderScanStats.ElapsedWallTimeUS = ManifestParseTimer.GetElapsedTimeUs(); } - std::unique_ptr<ChunkingController> ChunkController = CreateBasicChunkingController(); + std::unique_ptr<ChunkingController> ChunkController = + CreateChunkingControllerWithFixedChunking(ChunkingControllerWithFixedChunkingSettings{}); { CbObjectWriter ChunkParametersWriter; ChunkParametersWriter.AddString("name"sv, ChunkController->GetName()); @@ -4399,106 +4439,6 @@ namespace { } } - class WriteFileCache - { - public: - WriteFileCache(DiskStatistics& DiskStats) : m_DiskStats(DiskStats) {} - ~WriteFileCache() { Flush(); } - - template<typename TBufferType> - void WriteToFile(uint32_t TargetIndex, - std::function<std::filesystem::path(uint32_t TargetIndex)>&& GetTargetPath, - const TBufferType& Buffer, - uint64_t FileOffset, - uint64_t TargetFinalSize) - { - ZEN_TRACE_CPU("WriteFileCache_WriteToFile"); - if (!SeenTargetIndexes.empty() && SeenTargetIndexes.back() == TargetIndex) - { - ZEN_TRACE_CPU("WriteFileCache_WriteToFile_CacheWrite"); - ZEN_ASSERT(OpenFileWriter); - OpenFileWriter->Write(Buffer, FileOffset); - m_DiskStats.WriteCount++; - m_DiskStats.WriteByteCount += Buffer.GetSize(); - } - else - { - std::unique_ptr<BasicFile> NewOutputFile; - { - ZEN_TRACE_CPU("WriteFileCache_WriteToFile_Open"); - Flush(); - const std::filesystem::path& TargetPath = GetTargetPath(TargetIndex); - CreateDirectories(TargetPath.parent_path()); - uint32_t Tries = 5; - NewOutputFile = - std::make_unique<BasicFile>(TargetPath, BasicFile::Mode::kWrite, [&Tries, TargetPath](std::error_code& Ec) { - if (Tries < 3) - { - ZEN_CONSOLE("Failed opening file '{}': {}{}", TargetPath, Ec.message(), Tries > 1 ? " Retrying"sv : ""sv); - } - if (Tries > 1) - { - Sleep(100); - } - return --Tries > 0; - }); - m_DiskStats.OpenWriteCount++; - m_DiskStats.CurrentOpenFileCount++; - } - - const bool CacheWriter = TargetFinalSize > Buffer.GetSize(); - if (CacheWriter) - { - ZEN_TRACE_CPU("WriteFileCache_WriteToFile_CacheWrite"); - ZEN_ASSERT_SLOW(std::find(SeenTargetIndexes.begin(), SeenTargetIndexes.end(), TargetIndex) == SeenTargetIndexes.end()); - - OutputFile = std::move(NewOutputFile); - if (UseSparseFiles) - { - void* Handle = OutputFile->Handle(); - if (!PrepareFileForScatteredWrite(Handle, TargetFinalSize)) - { - ZEN_DEBUG("Unable to to prepare file '{}' with size {} for random write", - GetTargetPath(TargetIndex), - TargetFinalSize); - } - } - OpenFileWriter = std::make_unique<BasicFileWriter>(*OutputFile, Min(TargetFinalSize, 256u * 1024u)); - - OpenFileWriter->Write(Buffer, FileOffset); - m_DiskStats.WriteCount++; - m_DiskStats.WriteByteCount += Buffer.GetSize(); - SeenTargetIndexes.push_back(TargetIndex); - } - else - { - ZEN_TRACE_CPU("WriteFileCache_WriteToFile_Write"); - NewOutputFile->Write(Buffer, FileOffset); - m_DiskStats.WriteCount++; - m_DiskStats.WriteByteCount += Buffer.GetSize(); - NewOutputFile = {}; - m_DiskStats.CurrentOpenFileCount--; - } - } - } - - void Flush() - { - ZEN_TRACE_CPU("WriteFileCache_Flush"); - if (OutputFile) - { - m_DiskStats.CurrentOpenFileCount--; - } - - OpenFileWriter = {}; - OutputFile = {}; - } - DiskStatistics& m_DiskStats; - std::vector<uint32_t> SeenTargetIndexes; - std::unique_ptr<BasicFile> OutputFile; - std::unique_ptr<BasicFileWriter> OpenFileWriter; - }; - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> GetRemainingChunkTargets( std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, const ChunkedContentLookup& Lookup, @@ -4581,7 +4521,7 @@ namespace { VerifySize, ExpectedSize)); } - ZEN_TRACE_CPU("HashSequence"); + const IoHash VerifyChunkHash = IoHash::HashBuffer(std::move(VerifyBuffer)); if (VerifyChunkHash != SequenceRawHash) { @@ -4651,6 +4591,77 @@ namespace { return CompletedSequenceIndexes; } + void WriteSequenceChunk(const std::filesystem::path& TargetFolderPath, + const ChunkedFolderContent& RemoteContent, + BufferedWriteFileCache::Local& LocalWriter, + const CompositeBuffer& Chunk, + const uint32_t SequenceIndex, + const uint64_t FileOffset, + const uint32_t PathIndex, + DiskStatistics& DiskStats) + { + ZEN_TRACE_CPU("WriteSequenceChunk"); + + const uint64_t SequenceSize = RemoteContent.RawSizes[PathIndex]; + + auto OpenFile = [&](BasicFile& File) { + const std::filesystem::path FileName = + GetTempChunkedSequenceFileName(TargetFolderPath, RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); + File.Open(FileName, BasicFile::Mode::kWrite); + if (UseSparseFiles) + { + PrepareFileForScatteredWrite(File.Handle(), SequenceSize); + } + }; + + const uint64_t ChunkSize = Chunk.GetSize(); + ZEN_ASSERT(FileOffset + ChunkSize <= SequenceSize); + if (ChunkSize == SequenceSize) + { + BasicFile SingleChunkFile; + OpenFile(SingleChunkFile); + + DiskStats.CurrentOpenFileCount++; + auto _ = MakeGuard([&DiskStats]() { DiskStats.CurrentOpenFileCount--; }); + SingleChunkFile.Write(Chunk, FileOffset); + + DiskStats.WriteCount++; + DiskStats.WriteByteCount += ChunkSize; + } + else + { + const uint64_t MaxWriterBufferSize = 256u * 1025u; + + BufferedWriteFileCache::Local::Writer* Writer = LocalWriter.GetWriter(SequenceIndex); + if (Writer) + { + if ((!Writer->Writer) && (ChunkSize < MaxWriterBufferSize)) + { + Writer->Writer = std::make_unique<BasicFileWriter>(*Writer->File, Min(SequenceSize, MaxWriterBufferSize)); + } + Writer->Write(Chunk, FileOffset); + + DiskStats.WriteCount++; + DiskStats.WriteByteCount += ChunkSize; + } + else + { + Writer = LocalWriter.PutWriter(SequenceIndex, std::make_unique<BufferedWriteFileCache::Local::Writer>()); + + Writer->File = std::make_unique<BasicFile>(); + OpenFile(*Writer->File); + if (ChunkSize < MaxWriterBufferSize) + { + Writer->Writer = std::make_unique<BasicFileWriter>(*Writer->File, Min(SequenceSize, MaxWriterBufferSize)); + } + Writer->Write(Chunk, FileOffset); + + DiskStats.WriteCount++; + DiskStats.WriteByteCount += ChunkSize; + } + } + } + struct BlockWriteOps { std::vector<CompositeBuffer> ChunkBuffers; @@ -4667,13 +4678,15 @@ namespace { const ChunkedContentLookup& Lookup, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, const BlockWriteOps& Ops, + BufferedWriteFileCache& WriteCache, ParallelWork& Work, WorkerThreadPool& VerifyPool, DiskStatistics& DiskStats) { ZEN_TRACE_CPU("WriteBlockChunkOps"); + { - WriteFileCache OpenFileCache(DiskStats); + BufferedWriteFileCache::Local LocalWriter(WriteCache); for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) { if (AbortFlag) @@ -4685,25 +4698,15 @@ namespace { ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <= RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]); ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0); - const uint64_t ChunkSize = Chunk.GetSize(); const uint64_t FileOffset = WriteOp.Target->Offset; const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex]; - ZEN_ASSERT(FileOffset + ChunkSize <= RemoteContent.RawSizes[PathIndex]); - OpenFileCache.WriteToFile<CompositeBuffer>( - SequenceIndex, - [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) { - return GetTempChunkedSequenceFileName(CacheFolderPath, - RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); - }, - Chunk, - FileOffset, - RemoteContent.RawSizes[PathIndex]); + WriteSequenceChunk(CacheFolderPath, RemoteContent, LocalWriter, Chunk, SequenceIndex, FileOffset, PathIndex, DiskStats); } } if (!AbortFlag) { - // Write tracking, updating this must be done without any files open (WriteFileCache) + // Write tracking, updating this must be done without any files open (BufferedWriteFileCache::Local) std::vector<uint32_t> CompletedChunkSequences; for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) { @@ -4713,6 +4716,7 @@ namespace { CompletedChunkSequences.push_back(RemoteSequenceIndex); } } + WriteCache.Close(CompletedChunkSequences); VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, Lookup, CompletedChunkSequences, Work, VerifyPool); } } @@ -4864,6 +4868,7 @@ namespace { CompositeBuffer&& BlockBuffer, const ChunkedContentLookup& Lookup, std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, + BufferedWriteFileCache& WriteCache, DiskStatistics& DiskStats) { ZEN_TRACE_CPU("WriteBlockToDisk"); @@ -4896,6 +4901,7 @@ namespace { Lookup, SequenceIndexChunksLeftToWriteCounters, Ops, + WriteCache, Work, VerifyPool, DiskStats); @@ -4920,6 +4926,7 @@ namespace { Lookup, SequenceIndexChunksLeftToWriteCounters, Ops, + WriteCache, Work, VerifyPool, DiskStats); @@ -4939,6 +4946,7 @@ namespace { uint32_t LastIncludedBlockChunkIndex, const ChunkedContentLookup& Lookup, std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, + BufferedWriteFileCache& WriteCache, DiskStatistics& DiskStats) { ZEN_TRACE_CPU("WritePartialBlockToDisk"); @@ -4963,6 +4971,7 @@ namespace { Lookup, SequenceIndexChunksLeftToWriteCounters, Ops, + WriteCache, Work, VerifyPool, DiskStats); @@ -4974,75 +4983,15 @@ namespace { } } - SharedBuffer Decompress(CompositeBuffer&& CompressedChunk, const IoHash& ChunkHash, const uint64_t ChunkRawSize) - { - ZEN_TRACE_CPU("Decompress"); - - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedChunk, RawHash, RawSize); - if (!Compressed) - { - throw std::runtime_error(fmt::format("Invalid build blob format for chunk {}", ChunkHash)); - } - if (RawHash != ChunkHash) - { - throw std::runtime_error(fmt::format("Mismatching build blob {}, but compressed header rawhash is {}", ChunkHash, RawHash)); - } - if (RawSize != ChunkRawSize) - { - throw std::runtime_error( - fmt::format("Mismatching build blob {}, expected raw size {} but recevied raw size {}", ChunkHash, ChunkRawSize, RawSize)); - } - if (!Compressed) - { - throw std::runtime_error(fmt::format("Invalid build blob {}, not a compressed buffer", ChunkHash)); - } - - SharedBuffer Decompressed = Compressed.Decompress(); - - if (!Decompressed) - { - throw std::runtime_error(fmt::format("Decompression of build blob {} failed", ChunkHash)); - } - return Decompressed; - } - - void WriteChunkToDisk(const std::filesystem::path& CacheFolderPath, - const ChunkedFolderContent& Content, - const ChunkedContentLookup& Lookup, - std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> ChunkTargets, - CompositeBuffer&& ChunkData, - WriteFileCache& OpenFileCache) - { - ZEN_TRACE_CPU("WriteChunkToDisk"); - - for (const ChunkedContentLookup::ChunkSequenceLocation* TargetPtr : ChunkTargets) - { - const auto& Target = *TargetPtr; - const uint64_t FileOffset = Target.Offset; - const uint32_t SequenceIndex = Target.SequenceIndex; - const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex]; - - OpenFileCache.WriteToFile( - SequenceIndex, - [&CacheFolderPath, &Content](uint32_t SequenceIndex) { - return GetTempChunkedSequenceFileName(CacheFolderPath, Content.ChunkedContent.SequenceRawHashes[SequenceIndex]); - }, - ChunkData, - FileOffset, - Content.RawSizes[PathIndex]); - } - } - - bool CanDecompressDirectToSequence(const ChunkedFolderContent& RemoteContent, - const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations) + bool IsSingleFileChunk(const ChunkedFolderContent& RemoteContent, + const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations) { if (Locations.size() == 1) { const uint32_t FirstSequenceIndex = Locations[0]->SequenceIndex; - if (Locations[0]->Offset == 0 && RemoteContent.ChunkedContent.ChunkCounts[FirstSequenceIndex] == 1) + if (RemoteContent.ChunkedContent.ChunkCounts[FirstSequenceIndex] == 1) { + ZEN_ASSERT_SLOW(Locations[0]->Offset == 0); return true; } } @@ -5087,13 +5036,14 @@ namespace { DiskStats.ReadByteCount += SourceSize; if (!AbortFlag) { - DecompressedTemp.Write(RangeBuffer, Offset); for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) { Hash.Append(Segment.GetView()); + DecompressedTemp.Write(Segment, Offset); + Offset += Segment.GetSize(); + DiskStats.WriteByteCount += Segment.GetSize(); + DiskStats.WriteCount++; } - DiskStats.WriteByteCount += RangeBuffer.GetSize(); - DiskStats.WriteCount++; return true; } return false; @@ -5128,37 +5078,80 @@ namespace { const ChunkedContentLookup& RemoteLookup, const IoHash& ChunkHash, const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, + BufferedWriteFileCache& WriteCache, IoBuffer&& CompressedPart, DiskStatistics& DiskStats) { ZEN_TRACE_CPU("WriteCompressedChunk"); auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end()); - const uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second; - const uint64_t ChunkRawSize = RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; - if (CanDecompressDirectToSequence(RemoteContent, ChunkTargetPtrs)) + if (IsSingleFileChunk(RemoteContent, ChunkTargetPtrs)) { const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex; const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]; StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), DiskStats); + return false; } else { - SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)), ChunkHash, ChunkRawSize); + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompositeBuffer(std::move(CompressedPart)), RawHash, RawSize); + if (!Compressed) + { + throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", ChunkHash)); + } + if (RawHash != ChunkHash) + { + throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, ChunkHash)); + } - if (!AbortFlag) + BufferedWriteFileCache::Local LocalWriter(WriteCache); + + IoHashStream Hash; + bool CouldDecompress = Compressed.DecompressToStream( + 0, + (uint64_t)-1, + [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { + ZEN_UNUSED(SourceOffset); + ZEN_TRACE_CPU("StreamDecompress_Write"); + DiskStats.ReadByteCount += SourceSize; + if (!AbortFlag) + { + for (const ChunkedContentLookup::ChunkSequenceLocation* TargetPtr : ChunkTargetPtrs) + { + const auto& Target = *TargetPtr; + const uint64_t FileOffset = Target.Offset + Offset; + const uint32_t SequenceIndex = Target.SequenceIndex; + const uint32_t PathIndex = RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; + + WriteSequenceChunk(TargetFolder, + RemoteContent, + LocalWriter, + RangeBuffer, + SequenceIndex, + FileOffset, + PathIndex, + DiskStats); + } + + return true; + } + return false; + }); + + if (AbortFlag) { - WriteFileCache OpenFileCache(DiskStats); - WriteChunkToDisk(TargetFolder, - RemoteContent, - RemoteLookup, - ChunkTargetPtrs, - CompositeBuffer(std::move(Chunk)), - OpenFileCache); - return true; + return false; + } + + if (!CouldDecompress) + { + throw std::runtime_error(fmt::format("Failed to decompress large chunk {}", ChunkHash)); } + + return true; } - return false; } void AsyncWriteDownloadedChunk(const std::filesystem::path& ZenFolderPath, @@ -5166,6 +5159,7 @@ namespace { const ChunkedContentLookup& RemoteLookup, uint32_t RemoteChunkIndex, std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs, + BufferedWriteFileCache& WriteCache, ParallelWork& Work, WorkerThreadPool& WritePool, IoBuffer&& Payload, @@ -5230,6 +5224,7 @@ namespace { CompressedChunkPath, RemoteChunkIndex, TotalPartWriteCount, + &WriteCache, &DiskStats, &WritePartsComplete, &FilteredWrittenBytesPerSecond, @@ -5264,6 +5259,7 @@ namespace { RemoteLookup, ChunkHash, ChunkTargetPtrs, + WriteCache, std::move(CompressedPart), DiskStats); if (!AbortFlag) @@ -5281,6 +5277,7 @@ namespace { std::vector<uint32_t> CompletedSequences = CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); + WriteCache.Close(CompletedSequences); if (NeedHashVerify) { VerifyAndCompleteChunkSequencesAsync(TargetFolder, @@ -6387,6 +6384,8 @@ namespace { } } + BufferedWriteFileCache WriteCache; + for (uint32_t ScavengeOpIndex = 0; ScavengeOpIndex < ScavengeCopyOperations.size(); ScavengeOpIndex++) { if (AbortFlag) @@ -6487,6 +6486,7 @@ namespace { PreferredMultipartChunkSize, TotalRequestCount, TotalPartWriteCount, + &WriteCache, &FilteredDownloadedBytesPerSecond, &FilteredWrittenBytesPerSecond](std::atomic<bool>&) mutable { if (!AbortFlag) @@ -6535,6 +6535,7 @@ namespace { &RemoteLookup, &CacheFolderPath, &SequenceIndexChunksLeftToWriteCounters, + &WriteCache, &Work, &WritePool, &DiskStats, @@ -6568,6 +6569,7 @@ namespace { RemoteLookup, ChunkHash, ChunkTargetPtrs, + WriteCache, std::move(CompressedPart), DiskStats); WritePartsComplete++; @@ -6583,6 +6585,7 @@ namespace { std::vector<uint32_t> CompletedSequences = CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); + WriteCache.Close(CompletedSequences); if (NeedHashVerify) { VerifyAndCompleteChunkSequencesAsync(TargetFolder, @@ -6608,11 +6611,12 @@ namespace { &ZenFolderPath, &Storage, BuildId = Oid(BuildId), - &PrimeCacheOnly, + PrimeCacheOnly, &RemoteContent, &RemoteLookup, &ExistsResult, &SequenceIndexChunksLeftToWriteCounters, + &WriteCache, &Work, &WritePool, &NetworkPool, @@ -6654,6 +6658,7 @@ namespace { RemoteLookup, RemoteChunkIndex, std::move(ChunkTargetPtrs), + WriteCache, Work, WritePool, std::move(BuildBlob), @@ -6683,6 +6688,7 @@ namespace { BuildId, PrimeCacheOnly, &SequenceIndexChunksLeftToWriteCounters, + &WriteCache, &Work, &WritePool, ChunkHash, @@ -6718,6 +6724,7 @@ namespace { RemoteLookup, RemoteChunkIndex, std::move(ChunkTargetPtrs), + WriteCache, Work, WritePool, std::move(Payload), @@ -6763,6 +6770,7 @@ namespace { RemoteLookup, RemoteChunkIndex, std::move(ChunkTargetPtrs), + WriteCache, Work, WritePool, std::move(BuildBlob), @@ -6800,6 +6808,7 @@ namespace { &CacheFolderPath, &LocalLookup, &SequenceIndexChunksLeftToWriteCounters, + &WriteCache, &Work, &WritePool, &FilteredWrittenBytesPerSecond, @@ -6892,8 +6901,9 @@ namespace { tsl::robin_set<uint32_t> ChunkIndexesWritten; - BufferedOpenFile SourceFile(SourceFilePath, DiskStats); - WriteFileCache OpenFileCache(DiskStats); + BufferedOpenFile SourceFile(SourceFilePath, DiskStats); + BufferedWriteFileCache::Local LocalWriter(WriteCache); + for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();) { if (AbortFlag) @@ -6940,18 +6950,17 @@ namespace { } CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ReadLength); - ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]); - - OpenFileCache.WriteToFile<CompositeBuffer>( - RemoteSequenceIndex, - [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) { - return GetTempChunkedSequenceFileName( - CacheFolderPath, - RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); - }, - ChunkSource, - Op.Target->Offset, - RemoteContent.RawSizes[RemotePathIndex]); + + const uint64_t FileOffset = Op.Target->Offset; + + WriteSequenceChunk(CacheFolderPath, + RemoteContent, + LocalWriter, + ChunkSource, + RemoteSequenceIndex, + FileOffset, + RemotePathIndex, + DiskStats); CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes? @@ -6960,7 +6969,7 @@ namespace { } if (!AbortFlag) { - // Write tracking, updating this must be done without any files open (WriteFileCache) + // Write tracking, updating this must be done without any files open (BufferedWriteFileCache::Local) std::vector<uint32_t> CompletedChunkSequences; for (const WriteOp& Op : WriteOps) { @@ -6970,6 +6979,7 @@ namespace { CompletedChunkSequences.push_back(RemoteSequenceIndex); } } + WriteCache.Close(CompletedChunkSequences); VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, RemoteLookup, @@ -7003,6 +7013,7 @@ namespace { &RemoteLookup, &RemoteChunkIndexNeedsCopyFromSourceFlags, &SequenceIndexChunksLeftToWriteCounters, + &WriteCache, &Work, &WritePool, &BlockDescriptions, @@ -7038,6 +7049,7 @@ namespace { CompositeBuffer(std::move(BlockBuffer)), RemoteLookup, RemoteChunkIndexNeedsCopyFromSourceFlags, + WriteCache, DiskStats)) { std::error_code DummyEc; @@ -7080,6 +7092,7 @@ namespace { &RemoteContent, &SequenceIndexChunksLeftToWriteCounters, &ExistsResult, + &WriteCache, &FilteredDownloadedBytesPerSecond, TotalRequestCount, &WritePartsComplete, @@ -7185,6 +7198,7 @@ namespace { &RemoteChunkIndexNeedsCopyFromSourceFlags, &SequenceIndexChunksLeftToWriteCounters, &WritePartsComplete, + &WriteCache, &Work, TotalPartWriteCount, &WritePool, @@ -7230,6 +7244,7 @@ namespace { BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, RemoteLookup, RemoteChunkIndexNeedsCopyFromSourceFlags, + WriteCache, DiskStats)) { std::error_code DummyEc; @@ -7284,6 +7299,7 @@ namespace { &WritePool, &RemoteContent, &RemoteLookup, + &WriteCache, &CacheFolderPath, &RemoteChunkIndexNeedsCopyFromSourceFlags, &SequenceIndexChunksLeftToWriteCounters, @@ -7388,6 +7404,7 @@ namespace { &SequenceIndexChunksLeftToWriteCounters, BlockIndex, &BlockDescriptions, + &WriteCache, &WriteChunkStats, &DiskStats, &WritePartsComplete, @@ -7429,6 +7446,7 @@ namespace { CompositeBuffer(std::move(BlockBuffer)), RemoteLookup, RemoteChunkIndexNeedsCopyFromSourceFlags, + WriteCache, DiskStats)) { std::error_code DummyEc; @@ -7753,6 +7771,7 @@ namespace { Work.ScheduleWork(WritePool, [&Path, &LocalContent, &DeletedCount, LocalPathIndex](std::atomic<bool>&) { if (!AbortFlag) { + ZEN_TRACE_CPU("FinalizeTree_Remove"); const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); SetFileReadOnlyWithRetry(LocalFilePath, false); RemoveFileWithRetry(LocalFilePath); @@ -8815,7 +8834,7 @@ namespace { if (!ChunkController) { ZEN_CONSOLE("Warning: Unspecified chunking algorith, using default"); - ChunkController = CreateBasicChunkingController(); + ChunkController = CreateChunkingControllerWithFixedChunking(ChunkingControllerWithFixedChunkingSettings{}); } LocalContent = GetLocalContent(LocalFolderScanStats, @@ -8899,7 +8918,10 @@ namespace { { BuildPartString.Append(fmt::format(" {} ({})", BuildPart.second, BuildPart.first)); } - ZEN_CONSOLE("Downloading build {}, parts:{} to '{}'", BuildId, BuildPartString.ToView(), Path); + + uint64_t RawSize = std::accumulate(RemoteContent.RawSizes.begin(), RemoteContent.RawSizes.end(), std::uint64_t(0)); + + ZEN_CONSOLE("Downloading build {}, parts:{} to '{}' ({})", BuildId, BuildPartString.ToView(), Path, NiceBytes(RawSize)); FolderContent LocalFolderState; DiskStatistics DiskStats; @@ -9037,8 +9059,9 @@ namespace { ChunkedFolderContent CompareFolderContent; { - std::unique_ptr<ChunkingController> ChunkController = CreateBasicChunkingController(); - std::vector<std::string_view> ExcludeExtensions = DefaultExcludeExtensions; + std::unique_ptr<ChunkingController> ChunkController = + CreateChunkingControllerWithFixedChunking(ChunkingControllerWithFixedChunkingSettings{}); + std::vector<std::string_view> ExcludeExtensions = DefaultExcludeExtensions; if (OnlyChunked) { ExcludeExtensions.insert(ExcludeExtensions.end(), @@ -10717,7 +10740,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { ExtendableStringBuilder<256> SB; CompactBinaryToJson(MetaData, SB); - ZEN_CONSOLE("Upload Build {}, Part {} ({})\n{}", m_BuildId, BuildPartId, m_BuildPartName, SB.ToView()); + ZEN_CONSOLE("Upload Build {}, Part {} ({}) from '{}'\n{}", m_BuildId, BuildPartId, m_BuildPartName, m_Path, SB.ToView()); } const std::filesystem::path UploadTempDir = UploadTempDirectory(m_Path); diff --git a/src/zencore/basicfile.cpp b/src/zencore/basicfile.cpp index 993f2b616..6989da67e 100644 --- a/src/zencore/basicfile.cpp +++ b/src/zencore/basicfile.cpp @@ -283,7 +283,7 @@ BasicFile::Write(MemoryView Data, uint64_t FileOffset, std::error_code& Ec) void BasicFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset, std::error_code& Ec) { - const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024; + const uint64_t MaxChunkSize = 2u * 1024 * 1024; WriteFile(m_FileHandle, Data, Size, FileOffset, MaxChunkSize, Ec); } @@ -794,7 +794,7 @@ WriteToTempFile(CompositeBuffer&& Buffer, const std::filesystem::path& Path) { uint64_t Offset = 0; static const uint64_t BufferingSize = 256u * 1024u; - // BasicFileWriter BufferedOutput(BlockFile, BufferingSize / 2); + BasicFileWriter BufferedOutput(Temp, Min(BufferingSize, BufferSize)); for (const SharedBuffer& Segment : Buffer.GetSegments()) { size_t SegmentSize = Segment.GetSize(); @@ -806,14 +806,14 @@ WriteToTempFile(CompositeBuffer&& Buffer, const std::filesystem::path& Path) FileRef.FileChunkOffset, FileRef.FileChunkSize, BufferingSize, - [&Temp, &Offset](const void* Data, size_t Size) { - Temp.Write(Data, Size, Offset); + [&BufferedOutput, &Offset](const void* Data, size_t Size) { + BufferedOutput.Write(Data, Size, Offset); Offset += Size; }); } else { - Temp.Write(Segment.GetData(), SegmentSize, Offset); + BufferedOutput.Write(Segment.GetData(), SegmentSize, Offset); Offset += SegmentSize; } } diff --git a/src/zencore/blake3.cpp b/src/zencore/blake3.cpp index 4a77aa49a..054f0d3a0 100644 --- a/src/zencore/blake3.cpp +++ b/src/zencore/blake3.cpp @@ -151,6 +151,28 @@ BLAKE3Stream::Append(const void* data, size_t byteCount) return *this; } +BLAKE3Stream& +BLAKE3Stream::Append(const IoBuffer& Buffer) +{ + blake3_hasher* b3h = reinterpret_cast<blake3_hasher*>(m_HashState); + + size_t BufferSize = Buffer.GetSize(); + static const uint64_t BufferingSize = 256u * 1024u; + IoBufferFileReference FileRef; + if (BufferSize >= (BufferingSize + BufferingSize / 2) && Buffer.GetFileReference(FileRef)) + { + ScanFile(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize, BufferingSize, [&b3h](const void* Data, size_t Size) { + blake3_hasher_update(b3h, Data, Size); + }); + } + else + { + blake3_hasher_update(b3h, Buffer.GetData(), BufferSize); + } + + return *this; +} + BLAKE3 BLAKE3Stream::GetHash() { diff --git a/src/zencore/compress.cpp b/src/zencore/compress.cpp index 62b64bc9d..d9f381811 100644 --- a/src/zencore/compress.cpp +++ b/src/zencore/compress.cpp @@ -216,23 +216,45 @@ public: std::function<void(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback, uint64_t /* BlockSize */) const final { - UniqueBuffer HeaderData = CompressedBuffer::CreateHeaderForNoneEncoder(RawData.GetSize(), BLAKE3::HashBuffer(RawData)); - Callback(0, 0, 0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderData.GetData(), HeaderData.GetSize()))); + const uint64_t HeaderSize = CompressedBuffer::GetHeaderSizeForNoneEncoder(); - IoBufferFileReference FileRef = {nullptr, 0, 0}; - if ((RawData.GetSegments().size() == 1) && RawData.GetSegments()[0].AsIoBuffer().GetFileReference(FileRef)) + uint64_t RawOffset = 0; + BLAKE3Stream HashStream; + + for (const SharedBuffer& Segment : RawData.GetSegments()) { - ZEN_ASSERT(FileRef.FileHandle != nullptr); - uint64_t CallbackOffset = 0; - ScanFile(FileRef.FileHandle, 0, RawData.GetSize(), 512u * 1024u, [&](const void* Data, size_t Size) { - CompositeBuffer Tmp(SharedBuffer(IoBuffer(IoBuffer::Wrap, Data, Size))); - Callback(CallbackOffset, Size, HeaderData.GetSize() + CallbackOffset, Tmp); - CallbackOffset += Size; - }); - return true; + IoBufferFileReference FileRef = {nullptr, 0, 0}; + IoBuffer SegmentBuffer = Segment.AsIoBuffer(); + if (SegmentBuffer.GetFileReference(FileRef)) + { + ZEN_ASSERT(FileRef.FileHandle != nullptr); + + ScanFile(FileRef.FileHandle, + FileRef.FileChunkOffset, + FileRef.FileChunkSize, + 512u * 1024u, + [&](const void* Data, size_t Size) { + HashStream.Append(Data, Size); + CompositeBuffer Tmp(SharedBuffer::MakeView(Data, Size)); + Callback(RawOffset, Size, HeaderSize + RawOffset, Tmp); + RawOffset += Size; + }); + } + else + { + const uint64_t Size = SegmentBuffer.GetSize(); + HashStream.Append(SegmentBuffer); + Callback(RawOffset, Size, HeaderSize + RawOffset, CompositeBuffer(Segment)); + RawOffset += Size; + } } - Callback(0, RawData.GetSize(), HeaderData.GetSize(), RawData); + ZEN_ASSERT(RawOffset == RawData.GetSize()); + + UniqueBuffer HeaderData = CompressedBuffer::CreateHeaderForNoneEncoder(RawData.GetSize(), HashStream.GetHash()); + ZEN_ASSERT(HeaderData.GetSize() == HeaderSize); + Callback(0, 0, 0, CompositeBuffer(HeaderData.MoveToShared())); + return true; } }; @@ -323,7 +345,7 @@ public: ScanFile(FileRef.FileHandle, sizeof(BufferHeader) + RawOffset, RawSize, 512u * 1024u, [&](const void* Data, size_t Size) { if (Result) { - CompositeBuffer Tmp(SharedBuffer(IoBuffer(IoBuffer::Wrap, Data, Size))); + CompositeBuffer Tmp(SharedBuffer::MakeView(Data, Size)); Result = Callback(sizeof(BufferHeader) + RawOffset + CallbackOffset, Size, CallbackOffset, Tmp); } CallbackOffset += Size; diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp index 0a9b2a73a..c4264bc29 100644 --- a/src/zencore/filesystem.cpp +++ b/src/zencore/filesystem.cpp @@ -2275,23 +2275,32 @@ PrepareFileForScatteredWrite(void* FileHandle, uint64_t FinalSize) { bool Result = true; #if ZEN_PLATFORM_WINDOWS - DWORD _ = 0; - BOOL Ok = DeviceIoControl(FileHandle, FSCTL_SET_SPARSE, nullptr, 0, nullptr, 0, &_, nullptr); - if (!Ok) + + BY_HANDLE_FILE_INFORMATION Information; + if (GetFileInformationByHandle(FileHandle, &Information)) { - std::error_code DummyEc; - ZEN_DEBUG("Unable to set sparse mode for file '{}'", PathFromHandle(FileHandle, DummyEc)); - Result = false; + if ((Information.dwFileAttributes & FILE_ATTRIBUTE_SPARSE_FILE) == 0) + { + DWORD _ = 0; + BOOL Ok = DeviceIoControl(FileHandle, FSCTL_SET_SPARSE, nullptr, 0, nullptr, 0, &_, nullptr); + if (!Ok) + { + std::error_code DummyEc; + ZEN_DEBUG("Unable to set sparse mode for file '{}'", PathFromHandle(FileHandle, DummyEc)); + Result = false; + } + } } FILE_ALLOCATION_INFO AllocationInfo = {}; - AllocationInfo.AllocationSize.QuadPart = FinalSize; + AllocationInfo.AllocationSize.QuadPart = LONGLONG(FinalSize); if (!SetFileInformationByHandle(FileHandle, FileAllocationInfo, &AllocationInfo, DWORD(sizeof(AllocationInfo)))) { std::error_code DummyEc; ZEN_DEBUG("Unable to set file allocation size to {} for file '{}'", FinalSize, PathFromHandle(FileHandle, DummyEc)); Result = false; } + #else // ZEN_PLATFORM_WINDOWS ZEN_UNUSED(FileHandle, FinalSize); #endif // ZEN_PLATFORM_WINDOWS diff --git a/src/zencore/include/zencore/blake3.h b/src/zencore/include/zencore/blake3.h index 28bb348c0..f01e45266 100644 --- a/src/zencore/include/zencore/blake3.h +++ b/src/zencore/include/zencore/blake3.h @@ -53,6 +53,7 @@ struct BLAKE3Stream void Reset(); // Begin streaming hash compute (not needed on freshly constructed instance) BLAKE3Stream& Append(const void* data, size_t byteCount); // Append another chunk BLAKE3Stream& Append(MemoryView DataView) { return Append(DataView.GetData(), DataView.GetSize()); } // Append another chunk + BLAKE3Stream& Append(const IoBuffer& Buffer); // Append another chunk BLAKE3 GetHash(); // Obtain final hash. If you wish to reuse the instance call reset() private: diff --git a/src/zencore/include/zencore/iohash.h b/src/zencore/include/zencore/iohash.h index 7443e17b7..a619b0053 100644 --- a/src/zencore/include/zencore/iohash.h +++ b/src/zencore/include/zencore/iohash.h @@ -102,6 +102,12 @@ struct IoHashStream return *this; } + IoHashStream& Append(const IoBuffer& Buffer) + { + m_Blake3Stream.Append(Buffer); + return *this; + } + /// Append another chunk IoHashStream& Append(MemoryView Data) { diff --git a/src/zenutil/bufferedwritefilecache.cpp b/src/zenutil/bufferedwritefilecache.cpp new file mode 100644 index 000000000..a52850314 --- /dev/null +++ b/src/zenutil/bufferedwritefilecache.cpp @@ -0,0 +1,177 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/bufferedwritefilecache.h> + +#include <zencore/logging.h> +#include <zencore/trace.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <gsl/gsl-lite.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +BufferedWriteFileCache::BufferedWriteFileCache() : m_CacheHitCount(0), m_CacheMissCount(0), m_OpenHandleCount(0), m_DroppedHandleCount(0) +{ +} + +BufferedWriteFileCache::~BufferedWriteFileCache() +{ + ZEN_TRACE_CPU("~BufferedWriteFileCache()"); + + try + { + for (TOpenHandles& OpenHandles : m_OpenFiles) + { + while (BasicFile* File = OpenHandles.Pop()) + { + std::unique_ptr<BasicFile> FileToClose(File); + m_OpenHandleCount--; + } + } + m_OpenFiles.clear(); + m_ChunkWriters.clear(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~BufferedWriteFileCache() threw exeption: {}", Ex.what()); + } +} + +std::unique_ptr<BasicFile> +BufferedWriteFileCache::Get(uint32_t FileIndex) +{ + ZEN_TRACE_CPU("BufferedWriteFileCache::Get"); + + RwLock::ExclusiveLockScope _(m_WriterLock); + if (auto It = m_ChunkWriters.find(FileIndex); It != m_ChunkWriters.end()) + { + const uint32_t HandleIndex = It->second; + TOpenHandles& OpenHandles = m_OpenFiles[HandleIndex]; + if (BasicFile* File = OpenHandles.Pop(); File != nullptr) + { + m_OpenHandleCount--; + m_CacheHitCount++; + return std::unique_ptr<BasicFile>(File); + } + } + m_CacheMissCount++; + return nullptr; +} + +void +BufferedWriteFileCache::Put(uint32_t FileIndex, std::unique_ptr<BasicFile>&& Writer) +{ + ZEN_TRACE_CPU("BufferedWriteFileCache::Put"); + + if (m_OpenHandleCount.load() >= MaxBufferedCount) + { + m_DroppedHandleCount++; + return; + } + RwLock::ExclusiveLockScope _(m_WriterLock); + if (auto It = m_ChunkWriters.find(FileIndex); It != m_ChunkWriters.end()) + { + const uint32_t HandleIndex = It->second; + TOpenHandles& OpenHandles = m_OpenFiles[HandleIndex]; + if (OpenHandles.Push(Writer.get())) + { + Writer.release(); + m_OpenHandleCount++; + } + else + { + m_DroppedHandleCount++; + } + } + else + { + const uint32_t HandleIndex = gsl::narrow<uint32_t>(m_OpenFiles.size()); + m_OpenFiles.push_back(TOpenHandles{}); + m_OpenFiles.back().Push(Writer.release()); + m_ChunkWriters.insert_or_assign(FileIndex, HandleIndex); + m_OpenHandleCount++; + } +} + +void +BufferedWriteFileCache::Close(std::span<uint32_t> FileIndexes) +{ + ZEN_TRACE_CPU("BufferedWriteFileCache::Close"); + + std::vector<std::unique_ptr<BasicFile>> FilesToClose; + FilesToClose.reserve(FileIndexes.size()); + { + RwLock::ExclusiveLockScope _(m_WriterLock); + for (uint32_t FileIndex : FileIndexes) + { + if (auto It = m_ChunkWriters.find(FileIndex); It != m_ChunkWriters.end()) + { + const uint32_t HandleIndex = It->second; + TOpenHandles& OpenHandles = m_OpenFiles[HandleIndex]; + while (BasicFile* File = OpenHandles.Pop()) + { + FilesToClose.emplace_back(std::unique_ptr<BasicFile>(File)); + m_OpenHandleCount--; + } + m_ChunkWriters.erase(It); + } + } + } + FilesToClose.clear(); +} + +BufferedWriteFileCache::Local::Local(BufferedWriteFileCache& Cache) : m_Cache(Cache) +{ +} + +BufferedWriteFileCache::Local::Writer* +BufferedWriteFileCache::Local::GetWriter(uint32_t FileIndex) +{ + if (auto It = m_FileIndexToWriterIndex.find(FileIndex); It != m_FileIndexToWriterIndex.end()) + { + return m_ChunkWriters[It->second].get(); + } + std::unique_ptr<BasicFile> File = m_Cache.Get(FileIndex); + if (File) + { + const uint32_t WriterIndex = gsl::narrow<uint32_t>(m_ChunkWriters.size()); + m_FileIndexToWriterIndex.insert_or_assign(FileIndex, WriterIndex); + m_ChunkWriters.emplace_back(std::make_unique<Writer>(Writer{.File = std::move(File)})); + return m_ChunkWriters.back().get(); + } + return nullptr; +} + +BufferedWriteFileCache::Local::Writer* +BufferedWriteFileCache::Local::PutWriter(uint32_t FileIndex, std::unique_ptr<Writer> Writer) +{ + ZEN_ASSERT(!m_FileIndexToWriterIndex.contains(FileIndex)); + const uint32_t WriterIndex = gsl::narrow<uint32_t>(m_ChunkWriters.size()); + m_FileIndexToWriterIndex.insert_or_assign(FileIndex, WriterIndex); + m_ChunkWriters.emplace_back(std::move(Writer)); + return m_ChunkWriters.back().get(); +} + +BufferedWriteFileCache::Local::~Local() +{ + ZEN_TRACE_CPU("BufferedWriteFileCache::~Local()"); + try + { + for (auto& It : m_FileIndexToWriterIndex) + { + const uint32_t FileIndex = It.first; + const uint32_t WriterIndex = It.second; + m_ChunkWriters[WriterIndex]->Writer.reset(); + std::unique_ptr<BasicFile> File; + File.swap(m_ChunkWriters[WriterIndex]->File); + m_Cache.Put(FileIndex, std::move(File)); + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("BufferedWriteFileCache::~Local() threw exeption: {}", Ex.what()); + } +} + +} // namespace zen diff --git a/src/zenutil/chunkedcontent.cpp b/src/zenutil/chunkedcontent.cpp index ae129324e..4bec4901a 100644 --- a/src/zenutil/chunkedcontent.cpp +++ b/src/zenutil/chunkedcontent.cpp @@ -891,8 +891,12 @@ BuildChunkedContentLookup(const ChunkedFolderContent& Content) } Result.SequenceIndexFirstPathIndex.resize(Content.ChunkedContent.SequenceRawHashes.size(), (uint32_t)-1); + Result.PathExtensionHash.resize(Content.Paths.size()); for (uint32_t PathIndex = 0; PathIndex < Content.Paths.size(); PathIndex++) { + std::string LowercaseExtension = Content.Paths[PathIndex].extension().string(); + std::transform(LowercaseExtension.begin(), LowercaseExtension.end(), LowercaseExtension.begin(), ::tolower); + Result.PathExtensionHash[PathIndex] = HashStringDjb2(LowercaseExtension); if (Content.RawSizes[PathIndex] > 0) { const IoHash& RawHash = Content.RawHashes[PathIndex]; diff --git a/src/zenutil/chunkingcontroller.cpp b/src/zenutil/chunkingcontroller.cpp index a5ebce193..6fb4182c0 100644 --- a/src/zenutil/chunkingcontroller.cpp +++ b/src/zenutil/chunkingcontroller.cpp @@ -4,6 +4,7 @@ #include <zencore/basicfile.h> #include <zencore/compactbinarybuilder.h> +#include <zencore/filesystem.h> #include <zencore/trace.h> ZEN_THIRD_PARTY_INCLUDES_START @@ -35,32 +36,54 @@ namespace { return ChunkedParams{.UseThreshold = UseThreshold, .MinSize = MinSize, .MaxSize = MaxSize, .AvgSize = AvgSize}; } -} // namespace + void WriteChunkParams(CbObjectWriter& Writer, const ChunkedParams& Params) + { + Writer.BeginObject("ChunkingParams"sv); + { + Writer.AddBool("UseThreshold"sv, Params.UseThreshold); -class BasicChunkingController : public ChunkingController -{ -public: - BasicChunkingController(std::span<const std::string_view> ExcludeExtensions, - bool ExcludeElfFiles, - bool ExcludeMachOFiles, - uint64_t ChunkFileSizeLimit, - const ChunkedParams& ChunkingParams) - : m_ChunkExcludeExtensions(ExcludeExtensions.begin(), ExcludeExtensions.end()) - , m_ExcludeElfFiles(ExcludeElfFiles) - , m_ExcludeMachOFiles(ExcludeMachOFiles) - , m_ChunkFileSizeLimit(ChunkFileSizeLimit) - , m_ChunkingParams(ChunkingParams) + Writer.AddInteger("MinSize"sv, (uint64_t)Params.MinSize); + Writer.AddInteger("MaxSize"sv, (uint64_t)Params.MaxSize); + Writer.AddInteger("AvgSize"sv, (uint64_t)Params.AvgSize); + } + Writer.EndObject(); // ChunkingParams + } + + bool IsElfFile(BasicFile& Buffer) { + if (Buffer.FileSize() > 4) + { + uint32_t ElfCheck = 0; + Buffer.Read(&ElfCheck, 4, 0); + if (ElfCheck == 0x464c457f) + { + return true; + } + } + return false; } - BasicChunkingController(CbObjectView Parameters) - : m_ChunkExcludeExtensions(ReadStringArray(Parameters["ChunkExcludeExtensions"sv].AsArrayView())) - , m_ExcludeElfFiles(Parameters["ExcludeElfFiles"sv].AsBool(DefaultChunkingExcludeElfFiles)) - , m_ExcludeMachOFiles(Parameters["ExcludeMachOFiles"sv].AsBool(DefaultChunkingExcludeMachOFiles)) - , m_ChunkFileSizeLimit(Parameters["ChunkFileSizeLimit"sv].AsUInt64(DefaultChunkingFileSizeLimit)) - , m_ChunkingParams(ReadChunkParams(Parameters["ChunkingParams"sv].AsObjectView())) + bool IsMachOFile(BasicFile& Buffer) { + if (Buffer.FileSize() > 4) + { + uint32_t MachOCheck = 0; + Buffer.Read(&MachOCheck, 4, 0); + if ((MachOCheck == 0xfeedface) || (MachOCheck == 0xcefaedfe)) + { + return true; + } + } + return false; } +} // namespace + +class BasicChunkingController : public ChunkingController +{ +public: + BasicChunkingController(const BasicChunkingControllerSettings& Settings) : m_Settings(Settings) {} + + BasicChunkingController(CbObjectView Parameters) : m_Settings(ReadSettings(Parameters)) {} virtual bool ProcessFile(const std::filesystem::path& InputPath, uint64_t RawSize, @@ -70,35 +93,25 @@ public: { ZEN_TRACE_CPU("BasicChunkingController::ProcessFile"); const bool ExcludeFromChunking = - std::find(m_ChunkExcludeExtensions.begin(), m_ChunkExcludeExtensions.end(), InputPath.extension()) != - m_ChunkExcludeExtensions.end(); + std::find(m_Settings.ExcludeExtensions.begin(), m_Settings.ExcludeExtensions.end(), InputPath.extension()) != + m_Settings.ExcludeExtensions.end(); - if (ExcludeFromChunking || (RawSize < m_ChunkFileSizeLimit)) + if (ExcludeFromChunking || (RawSize < m_Settings.ChunkFileSizeLimit)) { return false; } BasicFile Buffer(InputPath, BasicFile::Mode::kRead); - if (m_ExcludeElfFiles && Buffer.FileSize() > 4) + if (m_Settings.ExcludeElfFiles && IsElfFile(Buffer)) { - uint32_t ElfCheck = 0; - Buffer.Read(&ElfCheck, 4, 0); - if (ElfCheck == 0x464c457f) - { - return false; - } + return false; } - if (m_ExcludeMachOFiles && Buffer.FileSize() > 4) + if (m_Settings.ExcludeMachOFiles && IsMachOFile(Buffer)) { - uint32_t MachOCheck = 0; - Buffer.Read(&MachOCheck, 4, 0); - if ((MachOCheck == 0xfeedface) || (MachOCheck == 0xcefaedfe)) - { - return false; - } + return false; } - OutChunked = ChunkData(Buffer, 0, RawSize, m_ChunkingParams, &BytesProcessed, &AbortFlag); + OutChunked = ChunkData(Buffer, 0, RawSize, m_Settings.ChunkingParams, &BytesProcessed, &AbortFlag); return true; } @@ -109,59 +122,43 @@ public: CbObjectWriter Writer; Writer.BeginArray("ChunkExcludeExtensions"sv); { - for (const std::string& Extension : m_ChunkExcludeExtensions) + for (const std::string& Extension : m_Settings.ExcludeExtensions) { Writer.AddString(Extension); } } Writer.EndArray(); // ChunkExcludeExtensions - Writer.AddBool("ExcludeElfFiles"sv, m_ExcludeElfFiles); - Writer.AddBool("ExcludeMachOFiles"sv, m_ExcludeMachOFiles); + Writer.AddBool("ExcludeElfFiles"sv, m_Settings.ExcludeElfFiles); + Writer.AddBool("ExcludeMachOFiles"sv, m_Settings.ExcludeMachOFiles); + Writer.AddInteger("ChunkFileSizeLimit"sv, m_Settings.ChunkFileSizeLimit); - Writer.AddInteger("ChunkFileSizeLimit"sv, m_ChunkFileSizeLimit); - Writer.BeginObject("ChunkingParams"sv); - { - Writer.AddBool("UseThreshold"sv, m_ChunkingParams.UseThreshold); + WriteChunkParams(Writer, m_Settings.ChunkingParams); - Writer.AddInteger("MinSize"sv, (uint64_t)m_ChunkingParams.MinSize); - Writer.AddInteger("MaxSize"sv, (uint64_t)m_ChunkingParams.MaxSize); - Writer.AddInteger("AvgSize"sv, (uint64_t)m_ChunkingParams.AvgSize); - } - Writer.EndObject(); // ChunkingParams return Writer.Save(); } static constexpr std::string_view Name = "BasicChunkingController"sv; -protected: - const std::vector<std::string> m_ChunkExcludeExtensions; - const bool m_ExcludeElfFiles = false; - const bool m_ExcludeMachOFiles = false; - const uint64_t m_ChunkFileSizeLimit; - const ChunkedParams m_ChunkingParams; +private: + static BasicChunkingControllerSettings ReadSettings(CbObjectView Parameters) + { + return BasicChunkingControllerSettings{ + .ExcludeExtensions = ReadStringArray(Parameters["ChunkExcludeExtensions"sv].AsArrayView()), + .ExcludeElfFiles = Parameters["ExcludeElfFiles"sv].AsBool(DefaultChunkingExcludeElfFiles), + .ExcludeMachOFiles = Parameters["ExcludeMachOFiles"sv].AsBool(DefaultChunkingExcludeMachOFiles), + .ChunkFileSizeLimit = Parameters["ChunkFileSizeLimit"sv].AsUInt64(DefaultChunkingFileSizeLimit), + .ChunkingParams = ReadChunkParams(Parameters["ChunkingParams"sv].AsObjectView())}; + } + + const BasicChunkingControllerSettings m_Settings; }; class ChunkingControllerWithFixedChunking : public ChunkingController { public: - ChunkingControllerWithFixedChunking(std::span<const std::string_view> FixedChunkingExtensions, - uint64_t ChunkFileSizeLimit, - const ChunkedParams& ChunkingParams, - uint32_t FixedChunkingChunkSize) - : m_FixedChunkingExtensions(FixedChunkingExtensions.begin(), FixedChunkingExtensions.end()) - , m_ChunkFileSizeLimit(ChunkFileSizeLimit) - , m_ChunkingParams(ChunkingParams) - , m_FixedChunkingChunkSize(FixedChunkingChunkSize) - { - } + ChunkingControllerWithFixedChunking(const ChunkingControllerWithFixedChunkingSettings& Settings) : m_Settings(Settings) {} - ChunkingControllerWithFixedChunking(CbObjectView Parameters) - : m_FixedChunkingExtensions(ReadStringArray(Parameters["FixedChunkingExtensions"sv].AsArrayView())) - , m_ChunkFileSizeLimit(Parameters["ChunkFileSizeLimit"sv].AsUInt64(DefaultChunkingFileSizeLimit)) - , m_ChunkingParams(ReadChunkParams(Parameters["ChunkingParams"sv].AsObjectView())) - , m_FixedChunkingChunkSize(Parameters["FixedChunkingChunkSize"sv].AsUInt32(16u * 1024u * 1024u)) - { - } + ChunkingControllerWithFixedChunking(CbObjectView Parameters) : m_Settings(ReadSettings(Parameters)) {} virtual bool ProcessFile(const std::filesystem::path& InputPath, uint64_t RawSize, @@ -170,33 +167,71 @@ public: std::atomic<bool>& AbortFlag) const override { ZEN_TRACE_CPU("ChunkingControllerWithFixedChunking::ProcessFile"); - if (RawSize < m_ChunkFileSizeLimit) + const bool ExcludeFromChunking = + std::find(m_Settings.ExcludeExtensions.begin(), m_Settings.ExcludeExtensions.end(), InputPath.extension()) != + m_Settings.ExcludeExtensions.end(); + + if (ExcludeFromChunking || (RawSize < m_Settings.ChunkFileSizeLimit)) { return false; } - const bool FixedChunking = std::find(m_FixedChunkingExtensions.begin(), m_FixedChunkingExtensions.end(), InputPath.extension()) != - m_FixedChunkingExtensions.end(); - if (FixedChunking) + const bool FixedChunkingExtension = + std::find(m_Settings.FixedChunkingExtensions.begin(), m_Settings.FixedChunkingExtensions.end(), InputPath.extension()) != + m_Settings.FixedChunkingExtensions.end(); + + if (FixedChunkingExtension) { + if (RawSize < m_Settings.MinSizeForFixedChunking) + { + return false; + } ZEN_TRACE_CPU("FixedChunking"); - IoHashStream FullHash; - IoBuffer Source = IoBufferBuilder::MakeFromFile(InputPath); + IoHashStream FullHasher; + BasicFile Source(InputPath, BasicFile::Mode::kRead); uint64_t Offset = 0; tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex; - ChunkHashToChunkIndex.reserve(1 + (RawSize / m_FixedChunkingChunkSize)); + const uint64_t ExpectedChunkCount = 1 + (RawSize / m_Settings.FixedChunkingChunkSize); + ChunkHashToChunkIndex.reserve(ExpectedChunkCount); + OutChunked.Info.ChunkHashes.reserve(ExpectedChunkCount); + OutChunked.Info.ChunkSequence.reserve(ExpectedChunkCount); + OutChunked.ChunkSources.reserve(ExpectedChunkCount); + + static const uint64_t BufferingSize = 256u * 1024u; + + IoHashStream ChunkHasher; + while (Offset < RawSize) { if (AbortFlag) { return false; } - uint64_t ChunkSize = std::min<uint64_t>(RawSize - Offset, m_FixedChunkingChunkSize); - IoBuffer Chunk(Source, Offset, ChunkSize); - MemoryView ChunkData = Chunk.GetView(); - FullHash.Append(ChunkData); - IoHash ChunkHash = IoHash::HashBuffer(ChunkData); + ChunkHasher.Reset(); + + uint64_t ChunkSize = std::min<uint64_t>(RawSize - Offset, m_Settings.FixedChunkingChunkSize); + if (ChunkSize >= (BufferingSize + BufferingSize / 2)) + { + ScanFile(Source.Handle(), + Offset, + ChunkSize, + BufferingSize, + [&FullHasher, &ChunkHasher, &BytesProcessed](const void* Data, size_t Size) { + FullHasher.Append(Data, Size); + ChunkHasher.Append(Data, Size); + BytesProcessed.fetch_add(Size); + }); + } + else + { + IoBuffer ChunkData = Source.ReadRange(Offset, ChunkSize); + FullHasher.Append(ChunkData); + ChunkHasher.Append(ChunkData); + BytesProcessed.fetch_add(ChunkSize); + } + + const IoHash ChunkHash = ChunkHasher.GetHash(); if (auto It = ChunkHashToChunkIndex.find(ChunkHash); It != ChunkHashToChunkIndex.end()) { OutChunked.Info.ChunkSequence.push_back(It->second); @@ -209,16 +244,24 @@ public: OutChunked.ChunkSources.push_back({.Offset = Offset, .Size = gsl::narrow<uint32_t>(ChunkSize)}); } Offset += ChunkSize; - BytesProcessed.fetch_add(ChunkSize); } OutChunked.Info.RawSize = RawSize; - OutChunked.Info.RawHash = FullHash.GetHash(); + OutChunked.Info.RawHash = FullHasher.GetHash(); return true; } else { BasicFile Buffer(InputPath, BasicFile::Mode::kRead); - OutChunked = ChunkData(Buffer, 0, RawSize, m_ChunkingParams, &BytesProcessed); + if (m_Settings.ExcludeElfFiles && IsElfFile(Buffer)) + { + return false; + } + if (m_Settings.ExcludeMachOFiles && IsMachOFile(Buffer)) + { + return false; + } + + OutChunked = ChunkData(Buffer, 0, RawSize, m_Settings.ChunkingParams, &BytesProcessed, &AbortFlag); return true; } } @@ -230,47 +273,57 @@ public: CbObjectWriter Writer; Writer.BeginArray("FixedChunkingExtensions"); { - for (const std::string& Extension : m_FixedChunkingExtensions) + for (const std::string& Extension : m_Settings.FixedChunkingExtensions) { Writer.AddString(Extension); } } Writer.EndArray(); // ChunkExcludeExtensions - Writer.AddInteger("ChunkFileSizeLimit"sv, m_ChunkFileSizeLimit); - Writer.BeginObject("ChunkingParams"sv); - { - Writer.AddBool("UseThreshold"sv, m_ChunkingParams.UseThreshold); - Writer.AddInteger("MinSize"sv, (uint64_t)m_ChunkingParams.MinSize); - Writer.AddInteger("MaxSize"sv, (uint64_t)m_ChunkingParams.MaxSize); - Writer.AddInteger("AvgSize"sv, (uint64_t)m_ChunkingParams.AvgSize); + Writer.BeginArray("ChunkExcludeExtensions"sv); + { + for (const std::string& Extension : m_Settings.ExcludeExtensions) + { + Writer.AddString(Extension); + } } - Writer.EndObject(); // ChunkingParams - Writer.AddInteger("FixedChunkingChunkSize"sv, m_FixedChunkingChunkSize); + Writer.EndArray(); // ChunkExcludeExtensions + + Writer.AddBool("ExcludeElfFiles"sv, m_Settings.ExcludeElfFiles); + Writer.AddBool("ExcludeMachOFiles"sv, m_Settings.ExcludeMachOFiles); + + Writer.AddInteger("ChunkFileSizeLimit"sv, m_Settings.ChunkFileSizeLimit); + + WriteChunkParams(Writer, m_Settings.ChunkingParams); + + Writer.AddInteger("FixedChunkingChunkSize"sv, m_Settings.FixedChunkingChunkSize); + Writer.AddInteger("MinSizeForFixedChunking"sv, m_Settings.MinSizeForFixedChunking); return Writer.Save(); } static constexpr std::string_view Name = "ChunkingControllerWithFixedChunking"sv; -protected: - const std::vector<std::string> m_FixedChunkingExtensions; - const uint64_t m_ChunkFileSizeLimit; - const ChunkedParams m_ChunkingParams; - const uint32_t m_FixedChunkingChunkSize; +private: + static ChunkingControllerWithFixedChunkingSettings ReadSettings(CbObjectView Parameters) + { + return ChunkingControllerWithFixedChunkingSettings{ + .FixedChunkingExtensions = ReadStringArray(Parameters["FixedChunkingExtensions"sv].AsArrayView()), + .ExcludeExtensions = ReadStringArray(Parameters["ChunkExcludeExtensions"sv].AsArrayView()), + .ExcludeElfFiles = Parameters["ExcludeElfFiles"sv].AsBool(DefaultChunkingExcludeElfFiles), + .ExcludeMachOFiles = Parameters["ExcludeMachOFiles"sv].AsBool(DefaultChunkingExcludeMachOFiles), + .ChunkFileSizeLimit = Parameters["ChunkFileSizeLimit"sv].AsUInt64(DefaultChunkingFileSizeLimit), + .ChunkingParams = ReadChunkParams(Parameters["ChunkingParams"sv].AsObjectView()), + .FixedChunkingChunkSize = Parameters["FixedChunkingChunkSize"sv].AsUInt64(DefaultFixedChunkingChunkSize), + .MinSizeForFixedChunking = Parameters["MinSizeForFixedChunking"sv].AsUInt64(DefaultFixedChunkingChunkSize)}; + } + + const ChunkingControllerWithFixedChunkingSettings m_Settings; }; std::unique_ptr<ChunkingController> -CreateBasicChunkingController(std::span<const std::string_view> ExcludeExtensions, - bool ExcludeElfFiles, - bool ExcludeMachOFiles, - uint64_t ChunkFileSizeLimit, - const ChunkedParams& ChunkingParams) +CreateBasicChunkingController(const BasicChunkingControllerSettings& Settings) { - return std::make_unique<BasicChunkingController>(ExcludeExtensions, - ExcludeElfFiles, - ExcludeMachOFiles, - ChunkFileSizeLimit, - ChunkingParams); + return std::make_unique<BasicChunkingController>(Settings); } std::unique_ptr<ChunkingController> CreateBasicChunkingController(CbObjectView Parameters) @@ -279,15 +332,9 @@ CreateBasicChunkingController(CbObjectView Parameters) } std::unique_ptr<ChunkingController> -CreateChunkingControllerWithFixedChunking(std::span<const std::string_view> FixedChunkingExtensions, - uint64_t ChunkFileSizeLimit, - const ChunkedParams& ChunkingParams, - uint32_t FixedChunkingChunkSize) +CreateChunkingControllerWithFixedChunking(const ChunkingControllerWithFixedChunkingSettings& Setting) { - return std::make_unique<ChunkingControllerWithFixedChunking>(FixedChunkingExtensions, - ChunkFileSizeLimit, - ChunkingParams, - FixedChunkingChunkSize); + return std::make_unique<ChunkingControllerWithFixedChunking>(Setting); } std::unique_ptr<ChunkingController> CreateChunkingControllerWithFixedChunking(CbObjectView Parameters) diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp index badfb4840..c389d16c5 100644 --- a/src/zenutil/filebuildstorage.cpp +++ b/src/zenutil/filebuildstorage.cpp @@ -678,13 +678,24 @@ protected: { return false; } - CompositeBuffer Decompressed = ValidateBuffer.DecompressToComposite(); - if (!Decompressed) + + IoHashStream Hash; + bool CouldDecompress = ValidateBuffer.DecompressToStream( + 0, + (uint64_t)-1, + [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { + ZEN_UNUSED(SourceOffset, SourceSize, Offset); + for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) + { + Hash.Append(Segment.GetView()); + } + return true; + }); + if (!CouldDecompress) { return false; } - IoHash Hash = IoHash::HashBuffer(Decompressed); - if (Hash != RawHash) + if (Hash.GetHash() != VerifyHash) { return false; } diff --git a/src/zenutil/include/zenutil/bufferedwritefilecache.h b/src/zenutil/include/zenutil/bufferedwritefilecache.h new file mode 100644 index 000000000..68d6c375e --- /dev/null +++ b/src/zenutil/include/zenutil/bufferedwritefilecache.h @@ -0,0 +1,106 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/basicfile.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +class CompositeBuffer; + +class BufferedWriteFileCache +{ +public: + BufferedWriteFileCache(const BufferedWriteFileCache&) = delete; + BufferedWriteFileCache& operator=(const BufferedWriteFileCache&) = delete; + + BufferedWriteFileCache(); + + ~BufferedWriteFileCache(); + + std::unique_ptr<BasicFile> Get(uint32_t FileIndex); + + void Put(uint32_t FileIndex, std::unique_ptr<BasicFile>&& Writer); + + void Close(std::span<uint32_t> FileIndexes); + + class Local + { + public: + struct Writer + { + std::unique_ptr<BasicFile> File; + std::unique_ptr<BasicFileWriter> Writer; + + inline void Write(const CompositeBuffer& Chunk, uint64_t FileOffset) + { + if (Writer) + { + Writer->Write(Chunk, FileOffset); + } + else + { + File->Write(Chunk, FileOffset); + } + } + }; + + Local(const Local&) = delete; + Local& operator=(const Local&) = delete; + + explicit Local(BufferedWriteFileCache& Cache); + ~Local(); + + Writer* GetWriter(uint32_t FileIndex); + Writer* PutWriter(uint32_t FileIndex, std::unique_ptr<Writer> Writer); + + private: + tsl::robin_map<uint32_t, uint32_t> m_FileIndexToWriterIndex; + std::vector<std::unique_ptr<Writer>> m_ChunkWriters; + BufferedWriteFileCache& m_Cache; + }; + +private: + static constexpr size_t MaxHandlesPerPath = 7; + static constexpr size_t MaxBufferedCount = 1024; + struct TOpenHandles + { + BasicFile* Files[MaxHandlesPerPath]; + uint64_t Size = 0; + inline BasicFile* Pop() + { + if (Size > 0) + { + return Files[--Size]; + } + else + { + return nullptr; + } + } + inline bool Push(BasicFile* File) + { + if (Size < MaxHandlesPerPath) + { + Files[Size++] = File; + return true; + } + return false; + } + }; + static_assert(sizeof(TOpenHandles) == 64); + + RwLock m_WriterLock; + tsl::robin_map<uint32_t, uint32_t> m_ChunkWriters; + std::vector<TOpenHandles> m_OpenFiles; + std::atomic<uint32_t> m_CacheHitCount; + std::atomic<uint32_t> m_CacheMissCount; + std::atomic<uint32_t> m_OpenHandleCount; + std::atomic<uint32_t> m_DroppedHandleCount; +}; + +} // namespace zen diff --git a/src/zenutil/include/zenutil/chunkedcontent.h b/src/zenutil/include/zenutil/chunkedcontent.h index d33869be2..03f52e5f6 100644 --- a/src/zenutil/include/zenutil/chunkedcontent.h +++ b/src/zenutil/include/zenutil/chunkedcontent.h @@ -135,6 +135,7 @@ struct ChunkedContentLookup ChunkSequenceLocationOffset; // ChunkSequenceLocations[ChunkLocationOffset[ChunkIndex]] -> start of sources for ChunkIndex std::vector<uint32_t> ChunkSequenceLocationCounts; // ChunkSequenceLocationCounts[ChunkIndex] count of chunk locations for ChunkIndex std::vector<uint32_t> SequenceIndexFirstPathIndex; // SequenceIndexFirstPathIndex[SequenceIndex] -> first path index with that RawHash + std::vector<uint32_t> PathExtensionHash; }; ChunkedContentLookup BuildChunkedContentLookup(const ChunkedFolderContent& Content); diff --git a/src/zenutil/include/zenutil/chunkingcontroller.h b/src/zenutil/include/zenutil/chunkingcontroller.h index 970917fb0..315502265 100644 --- a/src/zenutil/include/zenutil/chunkingcontroller.h +++ b/src/zenutil/include/zenutil/chunkingcontroller.h @@ -11,9 +11,11 @@ namespace zen { -const std::vector<std::string_view> DefaultChunkingExcludeExtensions = {".exe", ".dll", ".pdb", ".self", ".mp4"}; -const bool DefaultChunkingExcludeElfFiles = true; -const bool DefaultChunkingExcludeMachOFiles = true; +const std::vector<std::string> DefaultChunkingExcludeExtensions = + {".exe", ".dll", ".pdb", ".self", ".mp4", ".zip", ".7z", ".bzip", ".rar", ".gzip"}; +const std::vector<std::string> DefaultFixedChunkingExtensions = {".apk", ".nsp", ".xvc", ".pkg", ".dmg", ".ipa"}; +const bool DefaultChunkingExcludeElfFiles = true; +const bool DefaultChunkingExcludeMachOFiles = true; const ChunkedParams DefaultChunkedParams = {.MinSize = ((8u * 1u) * 1024u) - 128u, .MaxSize = 128u * 1024u, @@ -21,7 +23,8 @@ const ChunkedParams DefaultChunkedParams = {.MinSize = ((8u * 1u) * 1024u) - 128 const size_t DefaultChunkingFileSizeLimit = DefaultChunkedParams.MaxSize; -const uint32_t DefaultFixedChunkingChunkSize = 16u * 1024u * 1024u; +const uint64_t DefaultFixedChunkingChunkSize = 32u * 1024u * 1024u; +const uint64_t DefaultMinSizeForFixedChunking = DefaultFixedChunkingChunkSize * 8u; struct ChunkedInfoWithSource; @@ -40,19 +43,31 @@ public: virtual CbObject GetParameters() const = 0; }; -std::unique_ptr<ChunkingController> CreateBasicChunkingController( - std::span<const std::string_view> ExcludeExtensions = DefaultChunkingExcludeExtensions, - bool ExcludeElfFiles = DefaultChunkingExcludeElfFiles, - bool ExcludeMachOFiles = DefaultChunkingExcludeMachOFiles, - uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit, - const ChunkedParams& ChunkingParams = DefaultChunkedParams); +struct BasicChunkingControllerSettings +{ + std::vector<std::string> ExcludeExtensions = DefaultChunkingExcludeExtensions; + bool ExcludeElfFiles = DefaultChunkingExcludeElfFiles; + bool ExcludeMachOFiles = DefaultChunkingExcludeMachOFiles; + uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit; + ChunkedParams ChunkingParams = DefaultChunkedParams; +}; + +std::unique_ptr<ChunkingController> CreateBasicChunkingController(const BasicChunkingControllerSettings& Settings); std::unique_ptr<ChunkingController> CreateBasicChunkingController(CbObjectView Parameters); -std::unique_ptr<ChunkingController> CreateChunkingControllerWithFixedChunking( - std::span<const std::string_view> ExcludeExtensions = DefaultChunkingExcludeExtensions, - uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit, - const ChunkedParams& ChunkingParams = DefaultChunkedParams, - uint32_t FixedChunkingChunkSize = DefaultFixedChunkingChunkSize); +struct ChunkingControllerWithFixedChunkingSettings +{ + std::vector<std::string> FixedChunkingExtensions = DefaultFixedChunkingExtensions; + std::vector<std::string> ExcludeExtensions = DefaultChunkingExcludeExtensions; + bool ExcludeElfFiles = DefaultChunkingExcludeElfFiles; + bool ExcludeMachOFiles = DefaultChunkingExcludeMachOFiles; + uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit; + ChunkedParams ChunkingParams = DefaultChunkedParams; + uint64_t FixedChunkingChunkSize = DefaultFixedChunkingChunkSize; + uint64_t MinSizeForFixedChunking = DefaultMinSizeForFixedChunking; +}; + +std::unique_ptr<ChunkingController> CreateChunkingControllerWithFixedChunking(const ChunkingControllerWithFixedChunkingSettings& Setting); std::unique_ptr<ChunkingController> CreateChunkingControllerWithFixedChunking(CbObjectView Parameters); std::unique_ptr<ChunkingController> CreateChunkingController(std::string_view Name, CbObjectView Parameters); |