aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-06-03 16:21:01 +0200
committerGitHub Enterprise <[email protected]>2025-06-03 16:21:01 +0200
commita0b10b046095d57ffbdb46c83084601a832f4562 (patch)
treefe015645ea07d83c2784e3e28d0e976a37054859 /src
parentminor: fix unused variable warning on some compilers (diff)
downloadzen-a0b10b046095d57ffbdb46c83084601a832f4562.tar.xz
zen-a0b10b046095d57ffbdb46c83084601a832f4562.zip
fixed size chunking for encrypted files (#410)
- Improvement: Use fixed size block chunking for know encrypted/compressed file types - Improvement: Skip trying to compress chunks that are sourced from files that are known to be encrypted/compressed - Improvement: Add global open file cache for written files increasing throughput during download by reducing overhead of open/close of file by 80%
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.cpp549
-rw-r--r--src/zencore/basicfile.cpp10
-rw-r--r--src/zencore/blake3.cpp22
-rw-r--r--src/zencore/compress.cpp50
-rw-r--r--src/zencore/filesystem.cpp23
-rw-r--r--src/zencore/include/zencore/blake3.h1
-rw-r--r--src/zencore/include/zencore/iohash.h6
-rw-r--r--src/zenutil/bufferedwritefilecache.cpp177
-rw-r--r--src/zenutil/chunkedcontent.cpp4
-rw-r--r--src/zenutil/chunkingcontroller.cpp289
-rw-r--r--src/zenutil/filebuildstorage.cpp19
-rw-r--r--src/zenutil/include/zenutil/bufferedwritefilecache.h106
-rw-r--r--src/zenutil/include/zenutil/chunkedcontent.h1
-rw-r--r--src/zenutil/include/zenutil/chunkingcontroller.h45
14 files changed, 873 insertions, 429 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index f6f15acb0..e13c90b4b 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -22,6 +22,7 @@
#include <zenhttp/httpclient.h>
#include <zenhttp/httpclientauth.h>
#include <zenhttp/httpcommon.h>
+#include <zenutil/bufferedwritefilecache.h>
#include <zenutil/buildstoragecache.h>
#include <zenutil/chunkblock.h>
#include <zenutil/chunkedcontent.h>
@@ -105,6 +106,40 @@ namespace {
}
WorkerThreadPool& GetNetworkPool() { return SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst); }
+ static const std::vector<uint32_t> NonCompressableExtensions({HashStringDjb2(".mp4"sv),
+ HashStringDjb2(".zip"sv),
+ HashStringDjb2(".7z"sv),
+ HashStringDjb2(".bzip"sv),
+ HashStringDjb2(".rar"sv),
+ HashStringDjb2(".gzip"sv),
+ HashStringDjb2(".apk"sv),
+ HashStringDjb2(".nsp"sv),
+ HashStringDjb2(".xvc"sv),
+ HashStringDjb2(".pkg"sv),
+ HashStringDjb2(".dmg"sv),
+ HashStringDjb2(".ipa"sv)});
+
+ static const tsl::robin_set<uint32_t> NonCompressableExtensionSet(NonCompressableExtensions.begin(), NonCompressableExtensions.end());
+
+ static bool IsExtensionHashCompressable(const uint32_t PathHash) { return !NonCompressableExtensionSet.contains(PathHash); }
+
+ static bool IsChunkCompressable(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, uint32_t ChunkIndex)
+ {
+ ZEN_UNUSED(Content);
+ const uint32_t ChunkLocationCount = Lookup.ChunkSequenceLocationCounts[ChunkIndex];
+ if (ChunkLocationCount == 0)
+ {
+ return false;
+ }
+ const size_t ChunkLocationOffset = Lookup.ChunkSequenceLocationOffset[ChunkIndex];
+ const uint32_t SequenceIndex = Lookup.ChunkSequenceLocations[ChunkLocationOffset].SequenceIndex;
+ const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex];
+ const uint32_t ExtensionHash = Lookup.PathExtensionHash[PathIndex];
+
+ const bool IsCompressable = IsExtensionHashCompressable(ExtensionHash);
+ return IsCompressable;
+ }
+
const uint64_t MinimumSizeForCompressInBlock = 2u * 1024u;
const std::string ZenFolderName = ".zen";
@@ -313,7 +348,10 @@ namespace {
std::atomic<uint64_t>& WriteByteCount)
{
BasicFile TargetFile(TargetFilePath, BasicFile::Mode::kTruncate);
- PrepareFileForScatteredWrite(TargetFile.Handle(), RawSize);
+ if (UseSparseFiles)
+ {
+ PrepareFileForScatteredWrite(TargetFile.Handle(), RawSize);
+ }
uint64_t Offset = 0;
if (!ScanFile(SourceFilePath, 512u * 1024u, [&](const void* Data, size_t Size) {
TargetFile.Write(Data, Size, Offset);
@@ -600,24 +638,6 @@ namespace {
return AuthToken;
}
- bool IsBufferDiskBased(const IoBuffer& Buffer)
- {
- IoBufferFileReference FileRef;
- if (Buffer.GetFileReference(FileRef))
- {
- return true;
- }
- return false;
- }
-
- bool IsBufferDiskBased(const CompositeBuffer& Buffer)
- {
- // If this is a file based buffer or a compressed buffer with a memory-based header, we don't need to rewrite to disk to save memory
- std::span<const SharedBuffer> Segments = Buffer.GetSegments();
- ZEN_ASSERT(Buffer.GetSegments().size() > 0);
- return IsBufferDiskBased(Segments.back().AsIoBuffer());
- }
-
IoBuffer WriteToTempFile(CompositeBuffer&& Buffer,
const std::filesystem::path& TempFolderPath,
const IoHash& Hash,
@@ -1729,16 +1749,13 @@ namespace {
{
ZEN_ASSERT(false);
}
- uint64_t RawSize = Chunk.GetSize();
- if (Lookup.RawHashToSequenceIndex.contains(ChunkHash) && RawSize >= MinimumSizeForCompressInBlock)
- {
- // Standalone chunk, not part of a sequence
- return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid)};
- }
- else
- {
- return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, OodleCompressionLevel::None)};
- }
+ uint64_t RawSize = Chunk.GetSize();
+ const bool ShouldCompressChunk = Lookup.RawHashToSequenceIndex.contains(ChunkHash) &&
+ (RawSize >= MinimumSizeForCompressInBlock) &&
+ IsChunkCompressable(Content, Lookup, ChunkIndex);
+ const OodleCompressionLevel CompressionLevel =
+ ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None;
+ return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, CompressionLevel)};
}));
}
@@ -1768,18 +1785,15 @@ namespace {
Content.ChunkedContent.ChunkRawSizes[ChunkIndex]);
ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash);
- const uint64_t RawSize = Chunk.GetSize();
- if (Lookup.RawHashToSequenceIndex.contains(ChunkHash) && RawSize >= MinimumSizeForCompressInBlock)
- {
- CompositeBuffer CompressedChunk = CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid).GetCompressed();
- ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end());
- }
- else
- {
- CompositeBuffer CompressedChunk =
- CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, OodleCompressionLevel::None).GetCompressed();
- ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end());
- }
+ const uint64_t RawSize = Chunk.GetSize();
+ const bool ShouldCompressChunk = Lookup.RawHashToSequenceIndex.contains(ChunkHash) &&
+ (RawSize >= MinimumSizeForCompressInBlock) && IsChunkCompressable(Content, Lookup, ChunkIndex);
+ const OodleCompressionLevel CompressionLevel =
+ ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None;
+
+ CompositeBuffer CompressedChunk =
+ CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, CompressionLevel).GetCompressed();
+ ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end());
}
return CompressedBuffer::FromCompressedNoValidate(CompositeBuffer(std::move(ResultBuffers)));
};
@@ -2253,7 +2267,11 @@ namespace {
{
throw std::runtime_error(fmt::format("Fetched chunk {} has invalid size", ChunkHash));
}
- ZEN_ASSERT_SLOW(IoHash::HashBuffer(RawSource) == ChunkHash);
+
+ const bool ShouldCompressChunk = IsChunkCompressable(Content, Lookup, ChunkIndex);
+ const OodleCompressionLevel CompressionLevel = ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None;
+
+ if (ShouldCompressChunk)
{
std::filesystem::path TempFilePath = TempFolderPath / ChunkHash.ToHexString();
@@ -2266,6 +2284,9 @@ namespace {
fmt::format("Failed creating temporary file for compressing blob {}. Reason: {}", ChunkHash, Ec.message()));
}
+ uint64_t StreamRawBytes = 0;
+ uint64_t StreamCompressedBytes = 0;
+
bool CouldCompress = CompressedBuffer::CompressToStream(
CompositeBuffer(SharedBuffer(RawSource)),
[&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
@@ -2273,7 +2294,11 @@ namespace {
LooseChunksStats.CompressedChunkRawBytes += SourceSize;
CompressedFile.Write(RangeBuffer, Offset);
LooseChunksStats.CompressedChunkBytes += RangeBuffer.GetSize();
- });
+ StreamRawBytes += SourceSize;
+ StreamCompressedBytes += RangeBuffer.GetSize();
+ },
+ OodleCompressor::Mermaid,
+ CompressionLevel);
if (CouldCompress)
{
uint64_t CompressedSize = CompressedFile.FileSize();
@@ -2296,22 +2321,36 @@ namespace {
return Compressed.GetCompressed();
}
+ else
+ {
+ LooseChunksStats.CompressedChunkRawBytes -= StreamRawBytes;
+ LooseChunksStats.CompressedChunkBytes -= StreamCompressedBytes;
+ }
CompressedFile.Close();
RemoveFile(TempFilePath, Ec);
ZEN_UNUSED(Ec);
}
- // Try regular compress - decompress may fail if compressed data is larger than non-compressed
- CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(RawSource)));
+ CompressedBuffer CompressedBlob =
+ CompressedBuffer::Compress(SharedBuffer(std::move(RawSource)), OodleCompressor::Mermaid, CompressionLevel);
if (!CompressedBlob)
{
throw std::runtime_error(fmt::format("Failed to compress large blob {}", ChunkHash));
}
- if (!IsBufferDiskBased(CompressedBlob.GetCompressed()))
+ ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawHash() == ChunkHash);
+ ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawSize() == ChunkSize);
+
+ LooseChunksStats.CompressedChunkRawBytes += ChunkSize;
+ LooseChunksStats.CompressedChunkBytes += CompressedBlob.GetCompressedSize();
+
+ // If we use none-compression, the compressed blob references the data and has 64 kb in memory so we don't need to write it to disk
+ if (ShouldCompressChunk)
{
IoBuffer TempPayload = WriteToTempFile(std::move(CompressedBlob).GetCompressed(), TempFolderPath, ChunkHash);
CompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload));
}
+
+ LooseChunksStats.CompressedChunkCount++;
return std::move(CompressedBlob).GetCompressed();
}
@@ -3431,7 +3470,8 @@ namespace {
LocalFolderScanStats.ElapsedWallTimeUS = ManifestParseTimer.GetElapsedTimeUs();
}
- std::unique_ptr<ChunkingController> ChunkController = CreateBasicChunkingController();
+ std::unique_ptr<ChunkingController> ChunkController =
+ CreateChunkingControllerWithFixedChunking(ChunkingControllerWithFixedChunkingSettings{});
{
CbObjectWriter ChunkParametersWriter;
ChunkParametersWriter.AddString("name"sv, ChunkController->GetName());
@@ -4399,106 +4439,6 @@ namespace {
}
}
- class WriteFileCache
- {
- public:
- WriteFileCache(DiskStatistics& DiskStats) : m_DiskStats(DiskStats) {}
- ~WriteFileCache() { Flush(); }
-
- template<typename TBufferType>
- void WriteToFile(uint32_t TargetIndex,
- std::function<std::filesystem::path(uint32_t TargetIndex)>&& GetTargetPath,
- const TBufferType& Buffer,
- uint64_t FileOffset,
- uint64_t TargetFinalSize)
- {
- ZEN_TRACE_CPU("WriteFileCache_WriteToFile");
- if (!SeenTargetIndexes.empty() && SeenTargetIndexes.back() == TargetIndex)
- {
- ZEN_TRACE_CPU("WriteFileCache_WriteToFile_CacheWrite");
- ZEN_ASSERT(OpenFileWriter);
- OpenFileWriter->Write(Buffer, FileOffset);
- m_DiskStats.WriteCount++;
- m_DiskStats.WriteByteCount += Buffer.GetSize();
- }
- else
- {
- std::unique_ptr<BasicFile> NewOutputFile;
- {
- ZEN_TRACE_CPU("WriteFileCache_WriteToFile_Open");
- Flush();
- const std::filesystem::path& TargetPath = GetTargetPath(TargetIndex);
- CreateDirectories(TargetPath.parent_path());
- uint32_t Tries = 5;
- NewOutputFile =
- std::make_unique<BasicFile>(TargetPath, BasicFile::Mode::kWrite, [&Tries, TargetPath](std::error_code& Ec) {
- if (Tries < 3)
- {
- ZEN_CONSOLE("Failed opening file '{}': {}{}", TargetPath, Ec.message(), Tries > 1 ? " Retrying"sv : ""sv);
- }
- if (Tries > 1)
- {
- Sleep(100);
- }
- return --Tries > 0;
- });
- m_DiskStats.OpenWriteCount++;
- m_DiskStats.CurrentOpenFileCount++;
- }
-
- const bool CacheWriter = TargetFinalSize > Buffer.GetSize();
- if (CacheWriter)
- {
- ZEN_TRACE_CPU("WriteFileCache_WriteToFile_CacheWrite");
- ZEN_ASSERT_SLOW(std::find(SeenTargetIndexes.begin(), SeenTargetIndexes.end(), TargetIndex) == SeenTargetIndexes.end());
-
- OutputFile = std::move(NewOutputFile);
- if (UseSparseFiles)
- {
- void* Handle = OutputFile->Handle();
- if (!PrepareFileForScatteredWrite(Handle, TargetFinalSize))
- {
- ZEN_DEBUG("Unable to to prepare file '{}' with size {} for random write",
- GetTargetPath(TargetIndex),
- TargetFinalSize);
- }
- }
- OpenFileWriter = std::make_unique<BasicFileWriter>(*OutputFile, Min(TargetFinalSize, 256u * 1024u));
-
- OpenFileWriter->Write(Buffer, FileOffset);
- m_DiskStats.WriteCount++;
- m_DiskStats.WriteByteCount += Buffer.GetSize();
- SeenTargetIndexes.push_back(TargetIndex);
- }
- else
- {
- ZEN_TRACE_CPU("WriteFileCache_WriteToFile_Write");
- NewOutputFile->Write(Buffer, FileOffset);
- m_DiskStats.WriteCount++;
- m_DiskStats.WriteByteCount += Buffer.GetSize();
- NewOutputFile = {};
- m_DiskStats.CurrentOpenFileCount--;
- }
- }
- }
-
- void Flush()
- {
- ZEN_TRACE_CPU("WriteFileCache_Flush");
- if (OutputFile)
- {
- m_DiskStats.CurrentOpenFileCount--;
- }
-
- OpenFileWriter = {};
- OutputFile = {};
- }
- DiskStatistics& m_DiskStats;
- std::vector<uint32_t> SeenTargetIndexes;
- std::unique_ptr<BasicFile> OutputFile;
- std::unique_ptr<BasicFileWriter> OpenFileWriter;
- };
-
std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> GetRemainingChunkTargets(
std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
const ChunkedContentLookup& Lookup,
@@ -4581,7 +4521,7 @@ namespace {
VerifySize,
ExpectedSize));
}
- ZEN_TRACE_CPU("HashSequence");
+
const IoHash VerifyChunkHash = IoHash::HashBuffer(std::move(VerifyBuffer));
if (VerifyChunkHash != SequenceRawHash)
{
@@ -4651,6 +4591,77 @@ namespace {
return CompletedSequenceIndexes;
}
+ void WriteSequenceChunk(const std::filesystem::path& TargetFolderPath,
+ const ChunkedFolderContent& RemoteContent,
+ BufferedWriteFileCache::Local& LocalWriter,
+ const CompositeBuffer& Chunk,
+ const uint32_t SequenceIndex,
+ const uint64_t FileOffset,
+ const uint32_t PathIndex,
+ DiskStatistics& DiskStats)
+ {
+ ZEN_TRACE_CPU("WriteSequenceChunk");
+
+ const uint64_t SequenceSize = RemoteContent.RawSizes[PathIndex];
+
+ auto OpenFile = [&](BasicFile& File) {
+ const std::filesystem::path FileName =
+ GetTempChunkedSequenceFileName(TargetFolderPath, RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
+ File.Open(FileName, BasicFile::Mode::kWrite);
+ if (UseSparseFiles)
+ {
+ PrepareFileForScatteredWrite(File.Handle(), SequenceSize);
+ }
+ };
+
+ const uint64_t ChunkSize = Chunk.GetSize();
+ ZEN_ASSERT(FileOffset + ChunkSize <= SequenceSize);
+ if (ChunkSize == SequenceSize)
+ {
+ BasicFile SingleChunkFile;
+ OpenFile(SingleChunkFile);
+
+ DiskStats.CurrentOpenFileCount++;
+ auto _ = MakeGuard([&DiskStats]() { DiskStats.CurrentOpenFileCount--; });
+ SingleChunkFile.Write(Chunk, FileOffset);
+
+ DiskStats.WriteCount++;
+ DiskStats.WriteByteCount += ChunkSize;
+ }
+ else
+ {
+ const uint64_t MaxWriterBufferSize = 256u * 1025u;
+
+ BufferedWriteFileCache::Local::Writer* Writer = LocalWriter.GetWriter(SequenceIndex);
+ if (Writer)
+ {
+ if ((!Writer->Writer) && (ChunkSize < MaxWriterBufferSize))
+ {
+ Writer->Writer = std::make_unique<BasicFileWriter>(*Writer->File, Min(SequenceSize, MaxWriterBufferSize));
+ }
+ Writer->Write(Chunk, FileOffset);
+
+ DiskStats.WriteCount++;
+ DiskStats.WriteByteCount += ChunkSize;
+ }
+ else
+ {
+ Writer = LocalWriter.PutWriter(SequenceIndex, std::make_unique<BufferedWriteFileCache::Local::Writer>());
+
+ Writer->File = std::make_unique<BasicFile>();
+ OpenFile(*Writer->File);
+ if (ChunkSize < MaxWriterBufferSize)
+ {
+ Writer->Writer = std::make_unique<BasicFileWriter>(*Writer->File, Min(SequenceSize, MaxWriterBufferSize));
+ }
+ Writer->Write(Chunk, FileOffset);
+
+ DiskStats.WriteCount++;
+ DiskStats.WriteByteCount += ChunkSize;
+ }
+ }
+ }
+
struct BlockWriteOps
{
std::vector<CompositeBuffer> ChunkBuffers;
@@ -4667,13 +4678,15 @@ namespace {
const ChunkedContentLookup& Lookup,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
const BlockWriteOps& Ops,
+ BufferedWriteFileCache& WriteCache,
ParallelWork& Work,
WorkerThreadPool& VerifyPool,
DiskStatistics& DiskStats)
{
ZEN_TRACE_CPU("WriteBlockChunkOps");
+
{
- WriteFileCache OpenFileCache(DiskStats);
+ BufferedWriteFileCache::Local LocalWriter(WriteCache);
for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps)
{
if (AbortFlag)
@@ -4685,25 +4698,15 @@ namespace {
ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <=
RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]);
ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0);
- const uint64_t ChunkSize = Chunk.GetSize();
const uint64_t FileOffset = WriteOp.Target->Offset;
const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex];
- ZEN_ASSERT(FileOffset + ChunkSize <= RemoteContent.RawSizes[PathIndex]);
- OpenFileCache.WriteToFile<CompositeBuffer>(
- SequenceIndex,
- [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
- return GetTempChunkedSequenceFileName(CacheFolderPath,
- RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
- },
- Chunk,
- FileOffset,
- RemoteContent.RawSizes[PathIndex]);
+ WriteSequenceChunk(CacheFolderPath, RemoteContent, LocalWriter, Chunk, SequenceIndex, FileOffset, PathIndex, DiskStats);
}
}
if (!AbortFlag)
{
- // Write tracking, updating this must be done without any files open (WriteFileCache)
+ // Write tracking, updating this must be done without any files open (BufferedWriteFileCache::Local)
std::vector<uint32_t> CompletedChunkSequences;
for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps)
{
@@ -4713,6 +4716,7 @@ namespace {
CompletedChunkSequences.push_back(RemoteSequenceIndex);
}
}
+ WriteCache.Close(CompletedChunkSequences);
VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, Lookup, CompletedChunkSequences, Work, VerifyPool);
}
}
@@ -4864,6 +4868,7 @@ namespace {
CompositeBuffer&& BlockBuffer,
const ChunkedContentLookup& Lookup,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
+ BufferedWriteFileCache& WriteCache,
DiskStatistics& DiskStats)
{
ZEN_TRACE_CPU("WriteBlockToDisk");
@@ -4896,6 +4901,7 @@ namespace {
Lookup,
SequenceIndexChunksLeftToWriteCounters,
Ops,
+ WriteCache,
Work,
VerifyPool,
DiskStats);
@@ -4920,6 +4926,7 @@ namespace {
Lookup,
SequenceIndexChunksLeftToWriteCounters,
Ops,
+ WriteCache,
Work,
VerifyPool,
DiskStats);
@@ -4939,6 +4946,7 @@ namespace {
uint32_t LastIncludedBlockChunkIndex,
const ChunkedContentLookup& Lookup,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
+ BufferedWriteFileCache& WriteCache,
DiskStatistics& DiskStats)
{
ZEN_TRACE_CPU("WritePartialBlockToDisk");
@@ -4963,6 +4971,7 @@ namespace {
Lookup,
SequenceIndexChunksLeftToWriteCounters,
Ops,
+ WriteCache,
Work,
VerifyPool,
DiskStats);
@@ -4974,75 +4983,15 @@ namespace {
}
}
- SharedBuffer Decompress(CompositeBuffer&& CompressedChunk, const IoHash& ChunkHash, const uint64_t ChunkRawSize)
- {
- ZEN_TRACE_CPU("Decompress");
-
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedChunk, RawHash, RawSize);
- if (!Compressed)
- {
- throw std::runtime_error(fmt::format("Invalid build blob format for chunk {}", ChunkHash));
- }
- if (RawHash != ChunkHash)
- {
- throw std::runtime_error(fmt::format("Mismatching build blob {}, but compressed header rawhash is {}", ChunkHash, RawHash));
- }
- if (RawSize != ChunkRawSize)
- {
- throw std::runtime_error(
- fmt::format("Mismatching build blob {}, expected raw size {} but recevied raw size {}", ChunkHash, ChunkRawSize, RawSize));
- }
- if (!Compressed)
- {
- throw std::runtime_error(fmt::format("Invalid build blob {}, not a compressed buffer", ChunkHash));
- }
-
- SharedBuffer Decompressed = Compressed.Decompress();
-
- if (!Decompressed)
- {
- throw std::runtime_error(fmt::format("Decompression of build blob {} failed", ChunkHash));
- }
- return Decompressed;
- }
-
- void WriteChunkToDisk(const std::filesystem::path& CacheFolderPath,
- const ChunkedFolderContent& Content,
- const ChunkedContentLookup& Lookup,
- std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> ChunkTargets,
- CompositeBuffer&& ChunkData,
- WriteFileCache& OpenFileCache)
- {
- ZEN_TRACE_CPU("WriteChunkToDisk");
-
- for (const ChunkedContentLookup::ChunkSequenceLocation* TargetPtr : ChunkTargets)
- {
- const auto& Target = *TargetPtr;
- const uint64_t FileOffset = Target.Offset;
- const uint32_t SequenceIndex = Target.SequenceIndex;
- const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex];
-
- OpenFileCache.WriteToFile(
- SequenceIndex,
- [&CacheFolderPath, &Content](uint32_t SequenceIndex) {
- return GetTempChunkedSequenceFileName(CacheFolderPath, Content.ChunkedContent.SequenceRawHashes[SequenceIndex]);
- },
- ChunkData,
- FileOffset,
- Content.RawSizes[PathIndex]);
- }
- }
-
- bool CanDecompressDirectToSequence(const ChunkedFolderContent& RemoteContent,
- const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations)
+ bool IsSingleFileChunk(const ChunkedFolderContent& RemoteContent,
+ const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations)
{
if (Locations.size() == 1)
{
const uint32_t FirstSequenceIndex = Locations[0]->SequenceIndex;
- if (Locations[0]->Offset == 0 && RemoteContent.ChunkedContent.ChunkCounts[FirstSequenceIndex] == 1)
+ if (RemoteContent.ChunkedContent.ChunkCounts[FirstSequenceIndex] == 1)
{
+ ZEN_ASSERT_SLOW(Locations[0]->Offset == 0);
return true;
}
}
@@ -5087,13 +5036,14 @@ namespace {
DiskStats.ReadByteCount += SourceSize;
if (!AbortFlag)
{
- DecompressedTemp.Write(RangeBuffer, Offset);
for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
{
Hash.Append(Segment.GetView());
+ DecompressedTemp.Write(Segment, Offset);
+ Offset += Segment.GetSize();
+ DiskStats.WriteByteCount += Segment.GetSize();
+ DiskStats.WriteCount++;
}
- DiskStats.WriteByteCount += RangeBuffer.GetSize();
- DiskStats.WriteCount++;
return true;
}
return false;
@@ -5128,37 +5078,80 @@ namespace {
const ChunkedContentLookup& RemoteLookup,
const IoHash& ChunkHash,
const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
+ BufferedWriteFileCache& WriteCache,
IoBuffer&& CompressedPart,
DiskStatistics& DiskStats)
{
ZEN_TRACE_CPU("WriteCompressedChunk");
auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end());
- const uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second;
- const uint64_t ChunkRawSize = RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex];
- if (CanDecompressDirectToSequence(RemoteContent, ChunkTargetPtrs))
+ if (IsSingleFileChunk(RemoteContent, ChunkTargetPtrs))
{
const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex;
const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex];
StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), DiskStats);
+ return false;
}
else
{
- SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)), ChunkHash, ChunkRawSize);
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompositeBuffer(std::move(CompressedPart)), RawHash, RawSize);
+ if (!Compressed)
+ {
+ throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", ChunkHash));
+ }
+ if (RawHash != ChunkHash)
+ {
+ throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, ChunkHash));
+ }
- if (!AbortFlag)
+ BufferedWriteFileCache::Local LocalWriter(WriteCache);
+
+ IoHashStream Hash;
+ bool CouldDecompress = Compressed.DecompressToStream(
+ 0,
+ (uint64_t)-1,
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset);
+ ZEN_TRACE_CPU("StreamDecompress_Write");
+ DiskStats.ReadByteCount += SourceSize;
+ if (!AbortFlag)
+ {
+ for (const ChunkedContentLookup::ChunkSequenceLocation* TargetPtr : ChunkTargetPtrs)
+ {
+ const auto& Target = *TargetPtr;
+ const uint64_t FileOffset = Target.Offset + Offset;
+ const uint32_t SequenceIndex = Target.SequenceIndex;
+ const uint32_t PathIndex = RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex];
+
+ WriteSequenceChunk(TargetFolder,
+ RemoteContent,
+ LocalWriter,
+ RangeBuffer,
+ SequenceIndex,
+ FileOffset,
+ PathIndex,
+ DiskStats);
+ }
+
+ return true;
+ }
+ return false;
+ });
+
+ if (AbortFlag)
{
- WriteFileCache OpenFileCache(DiskStats);
- WriteChunkToDisk(TargetFolder,
- RemoteContent,
- RemoteLookup,
- ChunkTargetPtrs,
- CompositeBuffer(std::move(Chunk)),
- OpenFileCache);
- return true;
+ return false;
+ }
+
+ if (!CouldDecompress)
+ {
+ throw std::runtime_error(fmt::format("Failed to decompress large chunk {}", ChunkHash));
}
+
+ return true;
}
- return false;
}
void AsyncWriteDownloadedChunk(const std::filesystem::path& ZenFolderPath,
@@ -5166,6 +5159,7 @@ namespace {
const ChunkedContentLookup& RemoteLookup,
uint32_t RemoteChunkIndex,
std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs,
+ BufferedWriteFileCache& WriteCache,
ParallelWork& Work,
WorkerThreadPool& WritePool,
IoBuffer&& Payload,
@@ -5230,6 +5224,7 @@ namespace {
CompressedChunkPath,
RemoteChunkIndex,
TotalPartWriteCount,
+ &WriteCache,
&DiskStats,
&WritePartsComplete,
&FilteredWrittenBytesPerSecond,
@@ -5264,6 +5259,7 @@ namespace {
RemoteLookup,
ChunkHash,
ChunkTargetPtrs,
+ WriteCache,
std::move(CompressedPart),
DiskStats);
if (!AbortFlag)
@@ -5281,6 +5277,7 @@ namespace {
std::vector<uint32_t> CompletedSequences =
CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
+ WriteCache.Close(CompletedSequences);
if (NeedHashVerify)
{
VerifyAndCompleteChunkSequencesAsync(TargetFolder,
@@ -6387,6 +6384,8 @@ namespace {
}
}
+ BufferedWriteFileCache WriteCache;
+
for (uint32_t ScavengeOpIndex = 0; ScavengeOpIndex < ScavengeCopyOperations.size(); ScavengeOpIndex++)
{
if (AbortFlag)
@@ -6487,6 +6486,7 @@ namespace {
PreferredMultipartChunkSize,
TotalRequestCount,
TotalPartWriteCount,
+ &WriteCache,
&FilteredDownloadedBytesPerSecond,
&FilteredWrittenBytesPerSecond](std::atomic<bool>&) mutable {
if (!AbortFlag)
@@ -6535,6 +6535,7 @@ namespace {
&RemoteLookup,
&CacheFolderPath,
&SequenceIndexChunksLeftToWriteCounters,
+ &WriteCache,
&Work,
&WritePool,
&DiskStats,
@@ -6568,6 +6569,7 @@ namespace {
RemoteLookup,
ChunkHash,
ChunkTargetPtrs,
+ WriteCache,
std::move(CompressedPart),
DiskStats);
WritePartsComplete++;
@@ -6583,6 +6585,7 @@ namespace {
std::vector<uint32_t> CompletedSequences =
CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
+ WriteCache.Close(CompletedSequences);
if (NeedHashVerify)
{
VerifyAndCompleteChunkSequencesAsync(TargetFolder,
@@ -6608,11 +6611,12 @@ namespace {
&ZenFolderPath,
&Storage,
BuildId = Oid(BuildId),
- &PrimeCacheOnly,
+ PrimeCacheOnly,
&RemoteContent,
&RemoteLookup,
&ExistsResult,
&SequenceIndexChunksLeftToWriteCounters,
+ &WriteCache,
&Work,
&WritePool,
&NetworkPool,
@@ -6654,6 +6658,7 @@ namespace {
RemoteLookup,
RemoteChunkIndex,
std::move(ChunkTargetPtrs),
+ WriteCache,
Work,
WritePool,
std::move(BuildBlob),
@@ -6683,6 +6688,7 @@ namespace {
BuildId,
PrimeCacheOnly,
&SequenceIndexChunksLeftToWriteCounters,
+ &WriteCache,
&Work,
&WritePool,
ChunkHash,
@@ -6718,6 +6724,7 @@ namespace {
RemoteLookup,
RemoteChunkIndex,
std::move(ChunkTargetPtrs),
+ WriteCache,
Work,
WritePool,
std::move(Payload),
@@ -6763,6 +6770,7 @@ namespace {
RemoteLookup,
RemoteChunkIndex,
std::move(ChunkTargetPtrs),
+ WriteCache,
Work,
WritePool,
std::move(BuildBlob),
@@ -6800,6 +6808,7 @@ namespace {
&CacheFolderPath,
&LocalLookup,
&SequenceIndexChunksLeftToWriteCounters,
+ &WriteCache,
&Work,
&WritePool,
&FilteredWrittenBytesPerSecond,
@@ -6892,8 +6901,9 @@ namespace {
tsl::robin_set<uint32_t> ChunkIndexesWritten;
- BufferedOpenFile SourceFile(SourceFilePath, DiskStats);
- WriteFileCache OpenFileCache(DiskStats);
+ BufferedOpenFile SourceFile(SourceFilePath, DiskStats);
+ BufferedWriteFileCache::Local LocalWriter(WriteCache);
+
for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();)
{
if (AbortFlag)
@@ -6940,18 +6950,17 @@ namespace {
}
CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ReadLength);
- ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]);
-
- OpenFileCache.WriteToFile<CompositeBuffer>(
- RemoteSequenceIndex,
- [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
- return GetTempChunkedSequenceFileName(
- CacheFolderPath,
- RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
- },
- ChunkSource,
- Op.Target->Offset,
- RemoteContent.RawSizes[RemotePathIndex]);
+
+ const uint64_t FileOffset = Op.Target->Offset;
+
+ WriteSequenceChunk(CacheFolderPath,
+ RemoteContent,
+ LocalWriter,
+ ChunkSource,
+ RemoteSequenceIndex,
+ FileOffset,
+ RemotePathIndex,
+ DiskStats);
CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes?
@@ -6960,7 +6969,7 @@ namespace {
}
if (!AbortFlag)
{
- // Write tracking, updating this must be done without any files open (WriteFileCache)
+ // Write tracking, updating this must be done without any files open (BufferedWriteFileCache::Local)
std::vector<uint32_t> CompletedChunkSequences;
for (const WriteOp& Op : WriteOps)
{
@@ -6970,6 +6979,7 @@ namespace {
CompletedChunkSequences.push_back(RemoteSequenceIndex);
}
}
+ WriteCache.Close(CompletedChunkSequences);
VerifyAndCompleteChunkSequencesAsync(CacheFolderPath,
RemoteContent,
RemoteLookup,
@@ -7003,6 +7013,7 @@ namespace {
&RemoteLookup,
&RemoteChunkIndexNeedsCopyFromSourceFlags,
&SequenceIndexChunksLeftToWriteCounters,
+ &WriteCache,
&Work,
&WritePool,
&BlockDescriptions,
@@ -7038,6 +7049,7 @@ namespace {
CompositeBuffer(std::move(BlockBuffer)),
RemoteLookup,
RemoteChunkIndexNeedsCopyFromSourceFlags,
+ WriteCache,
DiskStats))
{
std::error_code DummyEc;
@@ -7080,6 +7092,7 @@ namespace {
&RemoteContent,
&SequenceIndexChunksLeftToWriteCounters,
&ExistsResult,
+ &WriteCache,
&FilteredDownloadedBytesPerSecond,
TotalRequestCount,
&WritePartsComplete,
@@ -7185,6 +7198,7 @@ namespace {
&RemoteChunkIndexNeedsCopyFromSourceFlags,
&SequenceIndexChunksLeftToWriteCounters,
&WritePartsComplete,
+ &WriteCache,
&Work,
TotalPartWriteCount,
&WritePool,
@@ -7230,6 +7244,7 @@ namespace {
BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
RemoteLookup,
RemoteChunkIndexNeedsCopyFromSourceFlags,
+ WriteCache,
DiskStats))
{
std::error_code DummyEc;
@@ -7284,6 +7299,7 @@ namespace {
&WritePool,
&RemoteContent,
&RemoteLookup,
+ &WriteCache,
&CacheFolderPath,
&RemoteChunkIndexNeedsCopyFromSourceFlags,
&SequenceIndexChunksLeftToWriteCounters,
@@ -7388,6 +7404,7 @@ namespace {
&SequenceIndexChunksLeftToWriteCounters,
BlockIndex,
&BlockDescriptions,
+ &WriteCache,
&WriteChunkStats,
&DiskStats,
&WritePartsComplete,
@@ -7429,6 +7446,7 @@ namespace {
CompositeBuffer(std::move(BlockBuffer)),
RemoteLookup,
RemoteChunkIndexNeedsCopyFromSourceFlags,
+ WriteCache,
DiskStats))
{
std::error_code DummyEc;
@@ -7753,6 +7771,7 @@ namespace {
Work.ScheduleWork(WritePool, [&Path, &LocalContent, &DeletedCount, LocalPathIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
+ ZEN_TRACE_CPU("FinalizeTree_Remove");
const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
SetFileReadOnlyWithRetry(LocalFilePath, false);
RemoveFileWithRetry(LocalFilePath);
@@ -8815,7 +8834,7 @@ namespace {
if (!ChunkController)
{
ZEN_CONSOLE("Warning: Unspecified chunking algorith, using default");
- ChunkController = CreateBasicChunkingController();
+ ChunkController = CreateChunkingControllerWithFixedChunking(ChunkingControllerWithFixedChunkingSettings{});
}
LocalContent = GetLocalContent(LocalFolderScanStats,
@@ -8899,7 +8918,10 @@ namespace {
{
BuildPartString.Append(fmt::format(" {} ({})", BuildPart.second, BuildPart.first));
}
- ZEN_CONSOLE("Downloading build {}, parts:{} to '{}'", BuildId, BuildPartString.ToView(), Path);
+
+ uint64_t RawSize = std::accumulate(RemoteContent.RawSizes.begin(), RemoteContent.RawSizes.end(), std::uint64_t(0));
+
+ ZEN_CONSOLE("Downloading build {}, parts:{} to '{}' ({})", BuildId, BuildPartString.ToView(), Path, NiceBytes(RawSize));
FolderContent LocalFolderState;
DiskStatistics DiskStats;
@@ -9037,8 +9059,9 @@ namespace {
ChunkedFolderContent CompareFolderContent;
{
- std::unique_ptr<ChunkingController> ChunkController = CreateBasicChunkingController();
- std::vector<std::string_view> ExcludeExtensions = DefaultExcludeExtensions;
+ std::unique_ptr<ChunkingController> ChunkController =
+ CreateChunkingControllerWithFixedChunking(ChunkingControllerWithFixedChunkingSettings{});
+ std::vector<std::string_view> ExcludeExtensions = DefaultExcludeExtensions;
if (OnlyChunked)
{
ExcludeExtensions.insert(ExcludeExtensions.end(),
@@ -10717,7 +10740,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
ExtendableStringBuilder<256> SB;
CompactBinaryToJson(MetaData, SB);
- ZEN_CONSOLE("Upload Build {}, Part {} ({})\n{}", m_BuildId, BuildPartId, m_BuildPartName, SB.ToView());
+ ZEN_CONSOLE("Upload Build {}, Part {} ({}) from '{}'\n{}", m_BuildId, BuildPartId, m_BuildPartName, m_Path, SB.ToView());
}
const std::filesystem::path UploadTempDir = UploadTempDirectory(m_Path);
diff --git a/src/zencore/basicfile.cpp b/src/zencore/basicfile.cpp
index 993f2b616..6989da67e 100644
--- a/src/zencore/basicfile.cpp
+++ b/src/zencore/basicfile.cpp
@@ -283,7 +283,7 @@ BasicFile::Write(MemoryView Data, uint64_t FileOffset, std::error_code& Ec)
void
BasicFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset, std::error_code& Ec)
{
- const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024;
+ const uint64_t MaxChunkSize = 2u * 1024 * 1024;
WriteFile(m_FileHandle, Data, Size, FileOffset, MaxChunkSize, Ec);
}
@@ -794,7 +794,7 @@ WriteToTempFile(CompositeBuffer&& Buffer, const std::filesystem::path& Path)
{
uint64_t Offset = 0;
static const uint64_t BufferingSize = 256u * 1024u;
- // BasicFileWriter BufferedOutput(BlockFile, BufferingSize / 2);
+ BasicFileWriter BufferedOutput(Temp, Min(BufferingSize, BufferSize));
for (const SharedBuffer& Segment : Buffer.GetSegments())
{
size_t SegmentSize = Segment.GetSize();
@@ -806,14 +806,14 @@ WriteToTempFile(CompositeBuffer&& Buffer, const std::filesystem::path& Path)
FileRef.FileChunkOffset,
FileRef.FileChunkSize,
BufferingSize,
- [&Temp, &Offset](const void* Data, size_t Size) {
- Temp.Write(Data, Size, Offset);
+ [&BufferedOutput, &Offset](const void* Data, size_t Size) {
+ BufferedOutput.Write(Data, Size, Offset);
Offset += Size;
});
}
else
{
- Temp.Write(Segment.GetData(), SegmentSize, Offset);
+ BufferedOutput.Write(Segment.GetData(), SegmentSize, Offset);
Offset += SegmentSize;
}
}
diff --git a/src/zencore/blake3.cpp b/src/zencore/blake3.cpp
index 4a77aa49a..054f0d3a0 100644
--- a/src/zencore/blake3.cpp
+++ b/src/zencore/blake3.cpp
@@ -151,6 +151,28 @@ BLAKE3Stream::Append(const void* data, size_t byteCount)
return *this;
}
+BLAKE3Stream&
+BLAKE3Stream::Append(const IoBuffer& Buffer)
+{
+ blake3_hasher* b3h = reinterpret_cast<blake3_hasher*>(m_HashState);
+
+ size_t BufferSize = Buffer.GetSize();
+ static const uint64_t BufferingSize = 256u * 1024u;
+ IoBufferFileReference FileRef;
+ if (BufferSize >= (BufferingSize + BufferingSize / 2) && Buffer.GetFileReference(FileRef))
+ {
+ ScanFile(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize, BufferingSize, [&b3h](const void* Data, size_t Size) {
+ blake3_hasher_update(b3h, Data, Size);
+ });
+ }
+ else
+ {
+ blake3_hasher_update(b3h, Buffer.GetData(), BufferSize);
+ }
+
+ return *this;
+}
+
BLAKE3
BLAKE3Stream::GetHash()
{
diff --git a/src/zencore/compress.cpp b/src/zencore/compress.cpp
index 62b64bc9d..d9f381811 100644
--- a/src/zencore/compress.cpp
+++ b/src/zencore/compress.cpp
@@ -216,23 +216,45 @@ public:
std::function<void(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
uint64_t /* BlockSize */) const final
{
- UniqueBuffer HeaderData = CompressedBuffer::CreateHeaderForNoneEncoder(RawData.GetSize(), BLAKE3::HashBuffer(RawData));
- Callback(0, 0, 0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderData.GetData(), HeaderData.GetSize())));
+ const uint64_t HeaderSize = CompressedBuffer::GetHeaderSizeForNoneEncoder();
- IoBufferFileReference FileRef = {nullptr, 0, 0};
- if ((RawData.GetSegments().size() == 1) && RawData.GetSegments()[0].AsIoBuffer().GetFileReference(FileRef))
+ uint64_t RawOffset = 0;
+ BLAKE3Stream HashStream;
+
+ for (const SharedBuffer& Segment : RawData.GetSegments())
{
- ZEN_ASSERT(FileRef.FileHandle != nullptr);
- uint64_t CallbackOffset = 0;
- ScanFile(FileRef.FileHandle, 0, RawData.GetSize(), 512u * 1024u, [&](const void* Data, size_t Size) {
- CompositeBuffer Tmp(SharedBuffer(IoBuffer(IoBuffer::Wrap, Data, Size)));
- Callback(CallbackOffset, Size, HeaderData.GetSize() + CallbackOffset, Tmp);
- CallbackOffset += Size;
- });
- return true;
+ IoBufferFileReference FileRef = {nullptr, 0, 0};
+ IoBuffer SegmentBuffer = Segment.AsIoBuffer();
+ if (SegmentBuffer.GetFileReference(FileRef))
+ {
+ ZEN_ASSERT(FileRef.FileHandle != nullptr);
+
+ ScanFile(FileRef.FileHandle,
+ FileRef.FileChunkOffset,
+ FileRef.FileChunkSize,
+ 512u * 1024u,
+ [&](const void* Data, size_t Size) {
+ HashStream.Append(Data, Size);
+ CompositeBuffer Tmp(SharedBuffer::MakeView(Data, Size));
+ Callback(RawOffset, Size, HeaderSize + RawOffset, Tmp);
+ RawOffset += Size;
+ });
+ }
+ else
+ {
+ const uint64_t Size = SegmentBuffer.GetSize();
+ HashStream.Append(SegmentBuffer);
+ Callback(RawOffset, Size, HeaderSize + RawOffset, CompositeBuffer(Segment));
+ RawOffset += Size;
+ }
}
- Callback(0, RawData.GetSize(), HeaderData.GetSize(), RawData);
+ ZEN_ASSERT(RawOffset == RawData.GetSize());
+
+ UniqueBuffer HeaderData = CompressedBuffer::CreateHeaderForNoneEncoder(RawData.GetSize(), HashStream.GetHash());
+ ZEN_ASSERT(HeaderData.GetSize() == HeaderSize);
+ Callback(0, 0, 0, CompositeBuffer(HeaderData.MoveToShared()));
+
return true;
}
};
@@ -323,7 +345,7 @@ public:
ScanFile(FileRef.FileHandle, sizeof(BufferHeader) + RawOffset, RawSize, 512u * 1024u, [&](const void* Data, size_t Size) {
if (Result)
{
- CompositeBuffer Tmp(SharedBuffer(IoBuffer(IoBuffer::Wrap, Data, Size)));
+ CompositeBuffer Tmp(SharedBuffer::MakeView(Data, Size));
Result = Callback(sizeof(BufferHeader) + RawOffset + CallbackOffset, Size, CallbackOffset, Tmp);
}
CallbackOffset += Size;
diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp
index 0a9b2a73a..c4264bc29 100644
--- a/src/zencore/filesystem.cpp
+++ b/src/zencore/filesystem.cpp
@@ -2275,23 +2275,32 @@ PrepareFileForScatteredWrite(void* FileHandle, uint64_t FinalSize)
{
bool Result = true;
#if ZEN_PLATFORM_WINDOWS
- DWORD _ = 0;
- BOOL Ok = DeviceIoControl(FileHandle, FSCTL_SET_SPARSE, nullptr, 0, nullptr, 0, &_, nullptr);
- if (!Ok)
+
+ BY_HANDLE_FILE_INFORMATION Information;
+ if (GetFileInformationByHandle(FileHandle, &Information))
{
- std::error_code DummyEc;
- ZEN_DEBUG("Unable to set sparse mode for file '{}'", PathFromHandle(FileHandle, DummyEc));
- Result = false;
+ if ((Information.dwFileAttributes & FILE_ATTRIBUTE_SPARSE_FILE) == 0)
+ {
+ DWORD _ = 0;
+ BOOL Ok = DeviceIoControl(FileHandle, FSCTL_SET_SPARSE, nullptr, 0, nullptr, 0, &_, nullptr);
+ if (!Ok)
+ {
+ std::error_code DummyEc;
+ ZEN_DEBUG("Unable to set sparse mode for file '{}'", PathFromHandle(FileHandle, DummyEc));
+ Result = false;
+ }
+ }
}
FILE_ALLOCATION_INFO AllocationInfo = {};
- AllocationInfo.AllocationSize.QuadPart = FinalSize;
+ AllocationInfo.AllocationSize.QuadPart = LONGLONG(FinalSize);
if (!SetFileInformationByHandle(FileHandle, FileAllocationInfo, &AllocationInfo, DWORD(sizeof(AllocationInfo))))
{
std::error_code DummyEc;
ZEN_DEBUG("Unable to set file allocation size to {} for file '{}'", FinalSize, PathFromHandle(FileHandle, DummyEc));
Result = false;
}
+
#else // ZEN_PLATFORM_WINDOWS
ZEN_UNUSED(FileHandle, FinalSize);
#endif // ZEN_PLATFORM_WINDOWS
diff --git a/src/zencore/include/zencore/blake3.h b/src/zencore/include/zencore/blake3.h
index 28bb348c0..f01e45266 100644
--- a/src/zencore/include/zencore/blake3.h
+++ b/src/zencore/include/zencore/blake3.h
@@ -53,6 +53,7 @@ struct BLAKE3Stream
void Reset(); // Begin streaming hash compute (not needed on freshly constructed instance)
BLAKE3Stream& Append(const void* data, size_t byteCount); // Append another chunk
BLAKE3Stream& Append(MemoryView DataView) { return Append(DataView.GetData(), DataView.GetSize()); } // Append another chunk
+ BLAKE3Stream& Append(const IoBuffer& Buffer); // Append another chunk
BLAKE3 GetHash(); // Obtain final hash. If you wish to reuse the instance call reset()
private:
diff --git a/src/zencore/include/zencore/iohash.h b/src/zencore/include/zencore/iohash.h
index 7443e17b7..a619b0053 100644
--- a/src/zencore/include/zencore/iohash.h
+++ b/src/zencore/include/zencore/iohash.h
@@ -102,6 +102,12 @@ struct IoHashStream
return *this;
}
+ IoHashStream& Append(const IoBuffer& Buffer)
+ {
+ m_Blake3Stream.Append(Buffer);
+ return *this;
+ }
+
/// Append another chunk
IoHashStream& Append(MemoryView Data)
{
diff --git a/src/zenutil/bufferedwritefilecache.cpp b/src/zenutil/bufferedwritefilecache.cpp
new file mode 100644
index 000000000..a52850314
--- /dev/null
+++ b/src/zenutil/bufferedwritefilecache.cpp
@@ -0,0 +1,177 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/bufferedwritefilecache.h>
+
+#include <zencore/logging.h>
+#include <zencore/trace.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <gsl/gsl-lite.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+BufferedWriteFileCache::BufferedWriteFileCache() : m_CacheHitCount(0), m_CacheMissCount(0), m_OpenHandleCount(0), m_DroppedHandleCount(0)
+{
+}
+
+BufferedWriteFileCache::~BufferedWriteFileCache()
+{
+ ZEN_TRACE_CPU("~BufferedWriteFileCache()");
+
+ try
+ {
+ for (TOpenHandles& OpenHandles : m_OpenFiles)
+ {
+ while (BasicFile* File = OpenHandles.Pop())
+ {
+ std::unique_ptr<BasicFile> FileToClose(File);
+ m_OpenHandleCount--;
+ }
+ }
+ m_OpenFiles.clear();
+ m_ChunkWriters.clear();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("~BufferedWriteFileCache() threw exeption: {}", Ex.what());
+ }
+}
+
+std::unique_ptr<BasicFile>
+BufferedWriteFileCache::Get(uint32_t FileIndex)
+{
+ ZEN_TRACE_CPU("BufferedWriteFileCache::Get");
+
+ RwLock::ExclusiveLockScope _(m_WriterLock);
+ if (auto It = m_ChunkWriters.find(FileIndex); It != m_ChunkWriters.end())
+ {
+ const uint32_t HandleIndex = It->second;
+ TOpenHandles& OpenHandles = m_OpenFiles[HandleIndex];
+ if (BasicFile* File = OpenHandles.Pop(); File != nullptr)
+ {
+ m_OpenHandleCount--;
+ m_CacheHitCount++;
+ return std::unique_ptr<BasicFile>(File);
+ }
+ }
+ m_CacheMissCount++;
+ return nullptr;
+}
+
+void
+BufferedWriteFileCache::Put(uint32_t FileIndex, std::unique_ptr<BasicFile>&& Writer)
+{
+ ZEN_TRACE_CPU("BufferedWriteFileCache::Put");
+
+ if (m_OpenHandleCount.load() >= MaxBufferedCount)
+ {
+ m_DroppedHandleCount++;
+ return;
+ }
+ RwLock::ExclusiveLockScope _(m_WriterLock);
+ if (auto It = m_ChunkWriters.find(FileIndex); It != m_ChunkWriters.end())
+ {
+ const uint32_t HandleIndex = It->second;
+ TOpenHandles& OpenHandles = m_OpenFiles[HandleIndex];
+ if (OpenHandles.Push(Writer.get()))
+ {
+ Writer.release();
+ m_OpenHandleCount++;
+ }
+ else
+ {
+ m_DroppedHandleCount++;
+ }
+ }
+ else
+ {
+ const uint32_t HandleIndex = gsl::narrow<uint32_t>(m_OpenFiles.size());
+ m_OpenFiles.push_back(TOpenHandles{});
+ m_OpenFiles.back().Push(Writer.release());
+ m_ChunkWriters.insert_or_assign(FileIndex, HandleIndex);
+ m_OpenHandleCount++;
+ }
+}
+
+void
+BufferedWriteFileCache::Close(std::span<uint32_t> FileIndexes)
+{
+ ZEN_TRACE_CPU("BufferedWriteFileCache::Close");
+
+ std::vector<std::unique_ptr<BasicFile>> FilesToClose;
+ FilesToClose.reserve(FileIndexes.size());
+ {
+ RwLock::ExclusiveLockScope _(m_WriterLock);
+ for (uint32_t FileIndex : FileIndexes)
+ {
+ if (auto It = m_ChunkWriters.find(FileIndex); It != m_ChunkWriters.end())
+ {
+ const uint32_t HandleIndex = It->second;
+ TOpenHandles& OpenHandles = m_OpenFiles[HandleIndex];
+ while (BasicFile* File = OpenHandles.Pop())
+ {
+ FilesToClose.emplace_back(std::unique_ptr<BasicFile>(File));
+ m_OpenHandleCount--;
+ }
+ m_ChunkWriters.erase(It);
+ }
+ }
+ }
+ FilesToClose.clear();
+}
+
+BufferedWriteFileCache::Local::Local(BufferedWriteFileCache& Cache) : m_Cache(Cache)
+{
+}
+
+BufferedWriteFileCache::Local::Writer*
+BufferedWriteFileCache::Local::GetWriter(uint32_t FileIndex)
+{
+ if (auto It = m_FileIndexToWriterIndex.find(FileIndex); It != m_FileIndexToWriterIndex.end())
+ {
+ return m_ChunkWriters[It->second].get();
+ }
+ std::unique_ptr<BasicFile> File = m_Cache.Get(FileIndex);
+ if (File)
+ {
+ const uint32_t WriterIndex = gsl::narrow<uint32_t>(m_ChunkWriters.size());
+ m_FileIndexToWriterIndex.insert_or_assign(FileIndex, WriterIndex);
+ m_ChunkWriters.emplace_back(std::make_unique<Writer>(Writer{.File = std::move(File)}));
+ return m_ChunkWriters.back().get();
+ }
+ return nullptr;
+}
+
+BufferedWriteFileCache::Local::Writer*
+BufferedWriteFileCache::Local::PutWriter(uint32_t FileIndex, std::unique_ptr<Writer> Writer)
+{
+ ZEN_ASSERT(!m_FileIndexToWriterIndex.contains(FileIndex));
+ const uint32_t WriterIndex = gsl::narrow<uint32_t>(m_ChunkWriters.size());
+ m_FileIndexToWriterIndex.insert_or_assign(FileIndex, WriterIndex);
+ m_ChunkWriters.emplace_back(std::move(Writer));
+ return m_ChunkWriters.back().get();
+}
+
+BufferedWriteFileCache::Local::~Local()
+{
+ ZEN_TRACE_CPU("BufferedWriteFileCache::~Local()");
+ try
+ {
+ for (auto& It : m_FileIndexToWriterIndex)
+ {
+ const uint32_t FileIndex = It.first;
+ const uint32_t WriterIndex = It.second;
+ m_ChunkWriters[WriterIndex]->Writer.reset();
+ std::unique_ptr<BasicFile> File;
+ File.swap(m_ChunkWriters[WriterIndex]->File);
+ m_Cache.Put(FileIndex, std::move(File));
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("BufferedWriteFileCache::~Local() threw exeption: {}", Ex.what());
+ }
+}
+
+} // namespace zen
diff --git a/src/zenutil/chunkedcontent.cpp b/src/zenutil/chunkedcontent.cpp
index ae129324e..4bec4901a 100644
--- a/src/zenutil/chunkedcontent.cpp
+++ b/src/zenutil/chunkedcontent.cpp
@@ -891,8 +891,12 @@ BuildChunkedContentLookup(const ChunkedFolderContent& Content)
}
Result.SequenceIndexFirstPathIndex.resize(Content.ChunkedContent.SequenceRawHashes.size(), (uint32_t)-1);
+ Result.PathExtensionHash.resize(Content.Paths.size());
for (uint32_t PathIndex = 0; PathIndex < Content.Paths.size(); PathIndex++)
{
+ std::string LowercaseExtension = Content.Paths[PathIndex].extension().string();
+ std::transform(LowercaseExtension.begin(), LowercaseExtension.end(), LowercaseExtension.begin(), ::tolower);
+ Result.PathExtensionHash[PathIndex] = HashStringDjb2(LowercaseExtension);
if (Content.RawSizes[PathIndex] > 0)
{
const IoHash& RawHash = Content.RawHashes[PathIndex];
diff --git a/src/zenutil/chunkingcontroller.cpp b/src/zenutil/chunkingcontroller.cpp
index a5ebce193..6fb4182c0 100644
--- a/src/zenutil/chunkingcontroller.cpp
+++ b/src/zenutil/chunkingcontroller.cpp
@@ -4,6 +4,7 @@
#include <zencore/basicfile.h>
#include <zencore/compactbinarybuilder.h>
+#include <zencore/filesystem.h>
#include <zencore/trace.h>
ZEN_THIRD_PARTY_INCLUDES_START
@@ -35,32 +36,54 @@ namespace {
return ChunkedParams{.UseThreshold = UseThreshold, .MinSize = MinSize, .MaxSize = MaxSize, .AvgSize = AvgSize};
}
-} // namespace
+ void WriteChunkParams(CbObjectWriter& Writer, const ChunkedParams& Params)
+ {
+ Writer.BeginObject("ChunkingParams"sv);
+ {
+ Writer.AddBool("UseThreshold"sv, Params.UseThreshold);
-class BasicChunkingController : public ChunkingController
-{
-public:
- BasicChunkingController(std::span<const std::string_view> ExcludeExtensions,
- bool ExcludeElfFiles,
- bool ExcludeMachOFiles,
- uint64_t ChunkFileSizeLimit,
- const ChunkedParams& ChunkingParams)
- : m_ChunkExcludeExtensions(ExcludeExtensions.begin(), ExcludeExtensions.end())
- , m_ExcludeElfFiles(ExcludeElfFiles)
- , m_ExcludeMachOFiles(ExcludeMachOFiles)
- , m_ChunkFileSizeLimit(ChunkFileSizeLimit)
- , m_ChunkingParams(ChunkingParams)
+ Writer.AddInteger("MinSize"sv, (uint64_t)Params.MinSize);
+ Writer.AddInteger("MaxSize"sv, (uint64_t)Params.MaxSize);
+ Writer.AddInteger("AvgSize"sv, (uint64_t)Params.AvgSize);
+ }
+ Writer.EndObject(); // ChunkingParams
+ }
+
+ bool IsElfFile(BasicFile& Buffer)
{
+ if (Buffer.FileSize() > 4)
+ {
+ uint32_t ElfCheck = 0;
+ Buffer.Read(&ElfCheck, 4, 0);
+ if (ElfCheck == 0x464c457f)
+ {
+ return true;
+ }
+ }
+ return false;
}
- BasicChunkingController(CbObjectView Parameters)
- : m_ChunkExcludeExtensions(ReadStringArray(Parameters["ChunkExcludeExtensions"sv].AsArrayView()))
- , m_ExcludeElfFiles(Parameters["ExcludeElfFiles"sv].AsBool(DefaultChunkingExcludeElfFiles))
- , m_ExcludeMachOFiles(Parameters["ExcludeMachOFiles"sv].AsBool(DefaultChunkingExcludeMachOFiles))
- , m_ChunkFileSizeLimit(Parameters["ChunkFileSizeLimit"sv].AsUInt64(DefaultChunkingFileSizeLimit))
- , m_ChunkingParams(ReadChunkParams(Parameters["ChunkingParams"sv].AsObjectView()))
+ bool IsMachOFile(BasicFile& Buffer)
{
+ if (Buffer.FileSize() > 4)
+ {
+ uint32_t MachOCheck = 0;
+ Buffer.Read(&MachOCheck, 4, 0);
+ if ((MachOCheck == 0xfeedface) || (MachOCheck == 0xcefaedfe))
+ {
+ return true;
+ }
+ }
+ return false;
}
+} // namespace
+
+class BasicChunkingController : public ChunkingController
+{
+public:
+ BasicChunkingController(const BasicChunkingControllerSettings& Settings) : m_Settings(Settings) {}
+
+ BasicChunkingController(CbObjectView Parameters) : m_Settings(ReadSettings(Parameters)) {}
virtual bool ProcessFile(const std::filesystem::path& InputPath,
uint64_t RawSize,
@@ -70,35 +93,25 @@ public:
{
ZEN_TRACE_CPU("BasicChunkingController::ProcessFile");
const bool ExcludeFromChunking =
- std::find(m_ChunkExcludeExtensions.begin(), m_ChunkExcludeExtensions.end(), InputPath.extension()) !=
- m_ChunkExcludeExtensions.end();
+ std::find(m_Settings.ExcludeExtensions.begin(), m_Settings.ExcludeExtensions.end(), InputPath.extension()) !=
+ m_Settings.ExcludeExtensions.end();
- if (ExcludeFromChunking || (RawSize < m_ChunkFileSizeLimit))
+ if (ExcludeFromChunking || (RawSize < m_Settings.ChunkFileSizeLimit))
{
return false;
}
BasicFile Buffer(InputPath, BasicFile::Mode::kRead);
- if (m_ExcludeElfFiles && Buffer.FileSize() > 4)
+ if (m_Settings.ExcludeElfFiles && IsElfFile(Buffer))
{
- uint32_t ElfCheck = 0;
- Buffer.Read(&ElfCheck, 4, 0);
- if (ElfCheck == 0x464c457f)
- {
- return false;
- }
+ return false;
}
- if (m_ExcludeMachOFiles && Buffer.FileSize() > 4)
+ if (m_Settings.ExcludeMachOFiles && IsMachOFile(Buffer))
{
- uint32_t MachOCheck = 0;
- Buffer.Read(&MachOCheck, 4, 0);
- if ((MachOCheck == 0xfeedface) || (MachOCheck == 0xcefaedfe))
- {
- return false;
- }
+ return false;
}
- OutChunked = ChunkData(Buffer, 0, RawSize, m_ChunkingParams, &BytesProcessed, &AbortFlag);
+ OutChunked = ChunkData(Buffer, 0, RawSize, m_Settings.ChunkingParams, &BytesProcessed, &AbortFlag);
return true;
}
@@ -109,59 +122,43 @@ public:
CbObjectWriter Writer;
Writer.BeginArray("ChunkExcludeExtensions"sv);
{
- for (const std::string& Extension : m_ChunkExcludeExtensions)
+ for (const std::string& Extension : m_Settings.ExcludeExtensions)
{
Writer.AddString(Extension);
}
}
Writer.EndArray(); // ChunkExcludeExtensions
- Writer.AddBool("ExcludeElfFiles"sv, m_ExcludeElfFiles);
- Writer.AddBool("ExcludeMachOFiles"sv, m_ExcludeMachOFiles);
+ Writer.AddBool("ExcludeElfFiles"sv, m_Settings.ExcludeElfFiles);
+ Writer.AddBool("ExcludeMachOFiles"sv, m_Settings.ExcludeMachOFiles);
+ Writer.AddInteger("ChunkFileSizeLimit"sv, m_Settings.ChunkFileSizeLimit);
- Writer.AddInteger("ChunkFileSizeLimit"sv, m_ChunkFileSizeLimit);
- Writer.BeginObject("ChunkingParams"sv);
- {
- Writer.AddBool("UseThreshold"sv, m_ChunkingParams.UseThreshold);
+ WriteChunkParams(Writer, m_Settings.ChunkingParams);
- Writer.AddInteger("MinSize"sv, (uint64_t)m_ChunkingParams.MinSize);
- Writer.AddInteger("MaxSize"sv, (uint64_t)m_ChunkingParams.MaxSize);
- Writer.AddInteger("AvgSize"sv, (uint64_t)m_ChunkingParams.AvgSize);
- }
- Writer.EndObject(); // ChunkingParams
return Writer.Save();
}
static constexpr std::string_view Name = "BasicChunkingController"sv;
-protected:
- const std::vector<std::string> m_ChunkExcludeExtensions;
- const bool m_ExcludeElfFiles = false;
- const bool m_ExcludeMachOFiles = false;
- const uint64_t m_ChunkFileSizeLimit;
- const ChunkedParams m_ChunkingParams;
+private:
+ static BasicChunkingControllerSettings ReadSettings(CbObjectView Parameters)
+ {
+ return BasicChunkingControllerSettings{
+ .ExcludeExtensions = ReadStringArray(Parameters["ChunkExcludeExtensions"sv].AsArrayView()),
+ .ExcludeElfFiles = Parameters["ExcludeElfFiles"sv].AsBool(DefaultChunkingExcludeElfFiles),
+ .ExcludeMachOFiles = Parameters["ExcludeMachOFiles"sv].AsBool(DefaultChunkingExcludeMachOFiles),
+ .ChunkFileSizeLimit = Parameters["ChunkFileSizeLimit"sv].AsUInt64(DefaultChunkingFileSizeLimit),
+ .ChunkingParams = ReadChunkParams(Parameters["ChunkingParams"sv].AsObjectView())};
+ }
+
+ const BasicChunkingControllerSettings m_Settings;
};
class ChunkingControllerWithFixedChunking : public ChunkingController
{
public:
- ChunkingControllerWithFixedChunking(std::span<const std::string_view> FixedChunkingExtensions,
- uint64_t ChunkFileSizeLimit,
- const ChunkedParams& ChunkingParams,
- uint32_t FixedChunkingChunkSize)
- : m_FixedChunkingExtensions(FixedChunkingExtensions.begin(), FixedChunkingExtensions.end())
- , m_ChunkFileSizeLimit(ChunkFileSizeLimit)
- , m_ChunkingParams(ChunkingParams)
- , m_FixedChunkingChunkSize(FixedChunkingChunkSize)
- {
- }
+ ChunkingControllerWithFixedChunking(const ChunkingControllerWithFixedChunkingSettings& Settings) : m_Settings(Settings) {}
- ChunkingControllerWithFixedChunking(CbObjectView Parameters)
- : m_FixedChunkingExtensions(ReadStringArray(Parameters["FixedChunkingExtensions"sv].AsArrayView()))
- , m_ChunkFileSizeLimit(Parameters["ChunkFileSizeLimit"sv].AsUInt64(DefaultChunkingFileSizeLimit))
- , m_ChunkingParams(ReadChunkParams(Parameters["ChunkingParams"sv].AsObjectView()))
- , m_FixedChunkingChunkSize(Parameters["FixedChunkingChunkSize"sv].AsUInt32(16u * 1024u * 1024u))
- {
- }
+ ChunkingControllerWithFixedChunking(CbObjectView Parameters) : m_Settings(ReadSettings(Parameters)) {}
virtual bool ProcessFile(const std::filesystem::path& InputPath,
uint64_t RawSize,
@@ -170,33 +167,71 @@ public:
std::atomic<bool>& AbortFlag) const override
{
ZEN_TRACE_CPU("ChunkingControllerWithFixedChunking::ProcessFile");
- if (RawSize < m_ChunkFileSizeLimit)
+ const bool ExcludeFromChunking =
+ std::find(m_Settings.ExcludeExtensions.begin(), m_Settings.ExcludeExtensions.end(), InputPath.extension()) !=
+ m_Settings.ExcludeExtensions.end();
+
+ if (ExcludeFromChunking || (RawSize < m_Settings.ChunkFileSizeLimit))
{
return false;
}
- const bool FixedChunking = std::find(m_FixedChunkingExtensions.begin(), m_FixedChunkingExtensions.end(), InputPath.extension()) !=
- m_FixedChunkingExtensions.end();
- if (FixedChunking)
+ const bool FixedChunkingExtension =
+ std::find(m_Settings.FixedChunkingExtensions.begin(), m_Settings.FixedChunkingExtensions.end(), InputPath.extension()) !=
+ m_Settings.FixedChunkingExtensions.end();
+
+ if (FixedChunkingExtension)
{
+ if (RawSize < m_Settings.MinSizeForFixedChunking)
+ {
+ return false;
+ }
ZEN_TRACE_CPU("FixedChunking");
- IoHashStream FullHash;
- IoBuffer Source = IoBufferBuilder::MakeFromFile(InputPath);
+ IoHashStream FullHasher;
+ BasicFile Source(InputPath, BasicFile::Mode::kRead);
uint64_t Offset = 0;
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex;
- ChunkHashToChunkIndex.reserve(1 + (RawSize / m_FixedChunkingChunkSize));
+ const uint64_t ExpectedChunkCount = 1 + (RawSize / m_Settings.FixedChunkingChunkSize);
+ ChunkHashToChunkIndex.reserve(ExpectedChunkCount);
+ OutChunked.Info.ChunkHashes.reserve(ExpectedChunkCount);
+ OutChunked.Info.ChunkSequence.reserve(ExpectedChunkCount);
+ OutChunked.ChunkSources.reserve(ExpectedChunkCount);
+
+ static const uint64_t BufferingSize = 256u * 1024u;
+
+ IoHashStream ChunkHasher;
+
while (Offset < RawSize)
{
if (AbortFlag)
{
return false;
}
- uint64_t ChunkSize = std::min<uint64_t>(RawSize - Offset, m_FixedChunkingChunkSize);
- IoBuffer Chunk(Source, Offset, ChunkSize);
- MemoryView ChunkData = Chunk.GetView();
- FullHash.Append(ChunkData);
- IoHash ChunkHash = IoHash::HashBuffer(ChunkData);
+ ChunkHasher.Reset();
+
+ uint64_t ChunkSize = std::min<uint64_t>(RawSize - Offset, m_Settings.FixedChunkingChunkSize);
+ if (ChunkSize >= (BufferingSize + BufferingSize / 2))
+ {
+ ScanFile(Source.Handle(),
+ Offset,
+ ChunkSize,
+ BufferingSize,
+ [&FullHasher, &ChunkHasher, &BytesProcessed](const void* Data, size_t Size) {
+ FullHasher.Append(Data, Size);
+ ChunkHasher.Append(Data, Size);
+ BytesProcessed.fetch_add(Size);
+ });
+ }
+ else
+ {
+ IoBuffer ChunkData = Source.ReadRange(Offset, ChunkSize);
+ FullHasher.Append(ChunkData);
+ ChunkHasher.Append(ChunkData);
+ BytesProcessed.fetch_add(ChunkSize);
+ }
+
+ const IoHash ChunkHash = ChunkHasher.GetHash();
if (auto It = ChunkHashToChunkIndex.find(ChunkHash); It != ChunkHashToChunkIndex.end())
{
OutChunked.Info.ChunkSequence.push_back(It->second);
@@ -209,16 +244,24 @@ public:
OutChunked.ChunkSources.push_back({.Offset = Offset, .Size = gsl::narrow<uint32_t>(ChunkSize)});
}
Offset += ChunkSize;
- BytesProcessed.fetch_add(ChunkSize);
}
OutChunked.Info.RawSize = RawSize;
- OutChunked.Info.RawHash = FullHash.GetHash();
+ OutChunked.Info.RawHash = FullHasher.GetHash();
return true;
}
else
{
BasicFile Buffer(InputPath, BasicFile::Mode::kRead);
- OutChunked = ChunkData(Buffer, 0, RawSize, m_ChunkingParams, &BytesProcessed);
+ if (m_Settings.ExcludeElfFiles && IsElfFile(Buffer))
+ {
+ return false;
+ }
+ if (m_Settings.ExcludeMachOFiles && IsMachOFile(Buffer))
+ {
+ return false;
+ }
+
+ OutChunked = ChunkData(Buffer, 0, RawSize, m_Settings.ChunkingParams, &BytesProcessed, &AbortFlag);
return true;
}
}
@@ -230,47 +273,57 @@ public:
CbObjectWriter Writer;
Writer.BeginArray("FixedChunkingExtensions");
{
- for (const std::string& Extension : m_FixedChunkingExtensions)
+ for (const std::string& Extension : m_Settings.FixedChunkingExtensions)
{
Writer.AddString(Extension);
}
}
Writer.EndArray(); // ChunkExcludeExtensions
- Writer.AddInteger("ChunkFileSizeLimit"sv, m_ChunkFileSizeLimit);
- Writer.BeginObject("ChunkingParams"sv);
- {
- Writer.AddBool("UseThreshold"sv, m_ChunkingParams.UseThreshold);
- Writer.AddInteger("MinSize"sv, (uint64_t)m_ChunkingParams.MinSize);
- Writer.AddInteger("MaxSize"sv, (uint64_t)m_ChunkingParams.MaxSize);
- Writer.AddInteger("AvgSize"sv, (uint64_t)m_ChunkingParams.AvgSize);
+ Writer.BeginArray("ChunkExcludeExtensions"sv);
+ {
+ for (const std::string& Extension : m_Settings.ExcludeExtensions)
+ {
+ Writer.AddString(Extension);
+ }
}
- Writer.EndObject(); // ChunkingParams
- Writer.AddInteger("FixedChunkingChunkSize"sv, m_FixedChunkingChunkSize);
+ Writer.EndArray(); // ChunkExcludeExtensions
+
+ Writer.AddBool("ExcludeElfFiles"sv, m_Settings.ExcludeElfFiles);
+ Writer.AddBool("ExcludeMachOFiles"sv, m_Settings.ExcludeMachOFiles);
+
+ Writer.AddInteger("ChunkFileSizeLimit"sv, m_Settings.ChunkFileSizeLimit);
+
+ WriteChunkParams(Writer, m_Settings.ChunkingParams);
+
+ Writer.AddInteger("FixedChunkingChunkSize"sv, m_Settings.FixedChunkingChunkSize);
+ Writer.AddInteger("MinSizeForFixedChunking"sv, m_Settings.MinSizeForFixedChunking);
return Writer.Save();
}
static constexpr std::string_view Name = "ChunkingControllerWithFixedChunking"sv;
-protected:
- const std::vector<std::string> m_FixedChunkingExtensions;
- const uint64_t m_ChunkFileSizeLimit;
- const ChunkedParams m_ChunkingParams;
- const uint32_t m_FixedChunkingChunkSize;
+private:
+ static ChunkingControllerWithFixedChunkingSettings ReadSettings(CbObjectView Parameters)
+ {
+ return ChunkingControllerWithFixedChunkingSettings{
+ .FixedChunkingExtensions = ReadStringArray(Parameters["FixedChunkingExtensions"sv].AsArrayView()),
+ .ExcludeExtensions = ReadStringArray(Parameters["ChunkExcludeExtensions"sv].AsArrayView()),
+ .ExcludeElfFiles = Parameters["ExcludeElfFiles"sv].AsBool(DefaultChunkingExcludeElfFiles),
+ .ExcludeMachOFiles = Parameters["ExcludeMachOFiles"sv].AsBool(DefaultChunkingExcludeMachOFiles),
+ .ChunkFileSizeLimit = Parameters["ChunkFileSizeLimit"sv].AsUInt64(DefaultChunkingFileSizeLimit),
+ .ChunkingParams = ReadChunkParams(Parameters["ChunkingParams"sv].AsObjectView()),
+ .FixedChunkingChunkSize = Parameters["FixedChunkingChunkSize"sv].AsUInt64(DefaultFixedChunkingChunkSize),
+ .MinSizeForFixedChunking = Parameters["MinSizeForFixedChunking"sv].AsUInt64(DefaultFixedChunkingChunkSize)};
+ }
+
+ const ChunkingControllerWithFixedChunkingSettings m_Settings;
};
std::unique_ptr<ChunkingController>
-CreateBasicChunkingController(std::span<const std::string_view> ExcludeExtensions,
- bool ExcludeElfFiles,
- bool ExcludeMachOFiles,
- uint64_t ChunkFileSizeLimit,
- const ChunkedParams& ChunkingParams)
+CreateBasicChunkingController(const BasicChunkingControllerSettings& Settings)
{
- return std::make_unique<BasicChunkingController>(ExcludeExtensions,
- ExcludeElfFiles,
- ExcludeMachOFiles,
- ChunkFileSizeLimit,
- ChunkingParams);
+ return std::make_unique<BasicChunkingController>(Settings);
}
std::unique_ptr<ChunkingController>
CreateBasicChunkingController(CbObjectView Parameters)
@@ -279,15 +332,9 @@ CreateBasicChunkingController(CbObjectView Parameters)
}
std::unique_ptr<ChunkingController>
-CreateChunkingControllerWithFixedChunking(std::span<const std::string_view> FixedChunkingExtensions,
- uint64_t ChunkFileSizeLimit,
- const ChunkedParams& ChunkingParams,
- uint32_t FixedChunkingChunkSize)
+CreateChunkingControllerWithFixedChunking(const ChunkingControllerWithFixedChunkingSettings& Setting)
{
- return std::make_unique<ChunkingControllerWithFixedChunking>(FixedChunkingExtensions,
- ChunkFileSizeLimit,
- ChunkingParams,
- FixedChunkingChunkSize);
+ return std::make_unique<ChunkingControllerWithFixedChunking>(Setting);
}
std::unique_ptr<ChunkingController>
CreateChunkingControllerWithFixedChunking(CbObjectView Parameters)
diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp
index badfb4840..c389d16c5 100644
--- a/src/zenutil/filebuildstorage.cpp
+++ b/src/zenutil/filebuildstorage.cpp
@@ -678,13 +678,24 @@ protected:
{
return false;
}
- CompositeBuffer Decompressed = ValidateBuffer.DecompressToComposite();
- if (!Decompressed)
+
+ IoHashStream Hash;
+ bool CouldDecompress = ValidateBuffer.DecompressToStream(
+ 0,
+ (uint64_t)-1,
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset, SourceSize, Offset);
+ for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ {
+ Hash.Append(Segment.GetView());
+ }
+ return true;
+ });
+ if (!CouldDecompress)
{
return false;
}
- IoHash Hash = IoHash::HashBuffer(Decompressed);
- if (Hash != RawHash)
+ if (Hash.GetHash() != VerifyHash)
{
return false;
}
diff --git a/src/zenutil/include/zenutil/bufferedwritefilecache.h b/src/zenutil/include/zenutil/bufferedwritefilecache.h
new file mode 100644
index 000000000..68d6c375e
--- /dev/null
+++ b/src/zenutil/include/zenutil/bufferedwritefilecache.h
@@ -0,0 +1,106 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/basicfile.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_map.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+class CompositeBuffer;
+
+class BufferedWriteFileCache
+{
+public:
+ BufferedWriteFileCache(const BufferedWriteFileCache&) = delete;
+ BufferedWriteFileCache& operator=(const BufferedWriteFileCache&) = delete;
+
+ BufferedWriteFileCache();
+
+ ~BufferedWriteFileCache();
+
+ std::unique_ptr<BasicFile> Get(uint32_t FileIndex);
+
+ void Put(uint32_t FileIndex, std::unique_ptr<BasicFile>&& Writer);
+
+ void Close(std::span<uint32_t> FileIndexes);
+
+ class Local
+ {
+ public:
+ struct Writer
+ {
+ std::unique_ptr<BasicFile> File;
+ std::unique_ptr<BasicFileWriter> Writer;
+
+ inline void Write(const CompositeBuffer& Chunk, uint64_t FileOffset)
+ {
+ if (Writer)
+ {
+ Writer->Write(Chunk, FileOffset);
+ }
+ else
+ {
+ File->Write(Chunk, FileOffset);
+ }
+ }
+ };
+
+ Local(const Local&) = delete;
+ Local& operator=(const Local&) = delete;
+
+ explicit Local(BufferedWriteFileCache& Cache);
+ ~Local();
+
+ Writer* GetWriter(uint32_t FileIndex);
+ Writer* PutWriter(uint32_t FileIndex, std::unique_ptr<Writer> Writer);
+
+ private:
+ tsl::robin_map<uint32_t, uint32_t> m_FileIndexToWriterIndex;
+ std::vector<std::unique_ptr<Writer>> m_ChunkWriters;
+ BufferedWriteFileCache& m_Cache;
+ };
+
+private:
+ static constexpr size_t MaxHandlesPerPath = 7;
+ static constexpr size_t MaxBufferedCount = 1024;
+ struct TOpenHandles
+ {
+ BasicFile* Files[MaxHandlesPerPath];
+ uint64_t Size = 0;
+ inline BasicFile* Pop()
+ {
+ if (Size > 0)
+ {
+ return Files[--Size];
+ }
+ else
+ {
+ return nullptr;
+ }
+ }
+ inline bool Push(BasicFile* File)
+ {
+ if (Size < MaxHandlesPerPath)
+ {
+ Files[Size++] = File;
+ return true;
+ }
+ return false;
+ }
+ };
+ static_assert(sizeof(TOpenHandles) == 64);
+
+ RwLock m_WriterLock;
+ tsl::robin_map<uint32_t, uint32_t> m_ChunkWriters;
+ std::vector<TOpenHandles> m_OpenFiles;
+ std::atomic<uint32_t> m_CacheHitCount;
+ std::atomic<uint32_t> m_CacheMissCount;
+ std::atomic<uint32_t> m_OpenHandleCount;
+ std::atomic<uint32_t> m_DroppedHandleCount;
+};
+
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/chunkedcontent.h b/src/zenutil/include/zenutil/chunkedcontent.h
index d33869be2..03f52e5f6 100644
--- a/src/zenutil/include/zenutil/chunkedcontent.h
+++ b/src/zenutil/include/zenutil/chunkedcontent.h
@@ -135,6 +135,7 @@ struct ChunkedContentLookup
ChunkSequenceLocationOffset; // ChunkSequenceLocations[ChunkLocationOffset[ChunkIndex]] -> start of sources for ChunkIndex
std::vector<uint32_t> ChunkSequenceLocationCounts; // ChunkSequenceLocationCounts[ChunkIndex] count of chunk locations for ChunkIndex
std::vector<uint32_t> SequenceIndexFirstPathIndex; // SequenceIndexFirstPathIndex[SequenceIndex] -> first path index with that RawHash
+ std::vector<uint32_t> PathExtensionHash;
};
ChunkedContentLookup BuildChunkedContentLookup(const ChunkedFolderContent& Content);
diff --git a/src/zenutil/include/zenutil/chunkingcontroller.h b/src/zenutil/include/zenutil/chunkingcontroller.h
index 970917fb0..315502265 100644
--- a/src/zenutil/include/zenutil/chunkingcontroller.h
+++ b/src/zenutil/include/zenutil/chunkingcontroller.h
@@ -11,9 +11,11 @@
namespace zen {
-const std::vector<std::string_view> DefaultChunkingExcludeExtensions = {".exe", ".dll", ".pdb", ".self", ".mp4"};
-const bool DefaultChunkingExcludeElfFiles = true;
-const bool DefaultChunkingExcludeMachOFiles = true;
+const std::vector<std::string> DefaultChunkingExcludeExtensions =
+ {".exe", ".dll", ".pdb", ".self", ".mp4", ".zip", ".7z", ".bzip", ".rar", ".gzip"};
+const std::vector<std::string> DefaultFixedChunkingExtensions = {".apk", ".nsp", ".xvc", ".pkg", ".dmg", ".ipa"};
+const bool DefaultChunkingExcludeElfFiles = true;
+const bool DefaultChunkingExcludeMachOFiles = true;
const ChunkedParams DefaultChunkedParams = {.MinSize = ((8u * 1u) * 1024u) - 128u,
.MaxSize = 128u * 1024u,
@@ -21,7 +23,8 @@ const ChunkedParams DefaultChunkedParams = {.MinSize = ((8u * 1u) * 1024u) - 128
const size_t DefaultChunkingFileSizeLimit = DefaultChunkedParams.MaxSize;
-const uint32_t DefaultFixedChunkingChunkSize = 16u * 1024u * 1024u;
+const uint64_t DefaultFixedChunkingChunkSize = 32u * 1024u * 1024u;
+const uint64_t DefaultMinSizeForFixedChunking = DefaultFixedChunkingChunkSize * 8u;
struct ChunkedInfoWithSource;
@@ -40,19 +43,31 @@ public:
virtual CbObject GetParameters() const = 0;
};
-std::unique_ptr<ChunkingController> CreateBasicChunkingController(
- std::span<const std::string_view> ExcludeExtensions = DefaultChunkingExcludeExtensions,
- bool ExcludeElfFiles = DefaultChunkingExcludeElfFiles,
- bool ExcludeMachOFiles = DefaultChunkingExcludeMachOFiles,
- uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit,
- const ChunkedParams& ChunkingParams = DefaultChunkedParams);
+struct BasicChunkingControllerSettings
+{
+ std::vector<std::string> ExcludeExtensions = DefaultChunkingExcludeExtensions;
+ bool ExcludeElfFiles = DefaultChunkingExcludeElfFiles;
+ bool ExcludeMachOFiles = DefaultChunkingExcludeMachOFiles;
+ uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit;
+ ChunkedParams ChunkingParams = DefaultChunkedParams;
+};
+
+std::unique_ptr<ChunkingController> CreateBasicChunkingController(const BasicChunkingControllerSettings& Settings);
std::unique_ptr<ChunkingController> CreateBasicChunkingController(CbObjectView Parameters);
-std::unique_ptr<ChunkingController> CreateChunkingControllerWithFixedChunking(
- std::span<const std::string_view> ExcludeExtensions = DefaultChunkingExcludeExtensions,
- uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit,
- const ChunkedParams& ChunkingParams = DefaultChunkedParams,
- uint32_t FixedChunkingChunkSize = DefaultFixedChunkingChunkSize);
+struct ChunkingControllerWithFixedChunkingSettings
+{
+ std::vector<std::string> FixedChunkingExtensions = DefaultFixedChunkingExtensions;
+ std::vector<std::string> ExcludeExtensions = DefaultChunkingExcludeExtensions;
+ bool ExcludeElfFiles = DefaultChunkingExcludeElfFiles;
+ bool ExcludeMachOFiles = DefaultChunkingExcludeMachOFiles;
+ uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit;
+ ChunkedParams ChunkingParams = DefaultChunkedParams;
+ uint64_t FixedChunkingChunkSize = DefaultFixedChunkingChunkSize;
+ uint64_t MinSizeForFixedChunking = DefaultMinSizeForFixedChunking;
+};
+
+std::unique_ptr<ChunkingController> CreateChunkingControllerWithFixedChunking(const ChunkingControllerWithFixedChunkingSettings& Setting);
std::unique_ptr<ChunkingController> CreateChunkingControllerWithFixedChunking(CbObjectView Parameters);
std::unique_ptr<ChunkingController> CreateChunkingController(std::string_view Name, CbObjectView Parameters);