aboutsummaryrefslogtreecommitdiff
path: root/src/zen/cmds/builds_cmd.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-06-03 16:21:01 +0200
committerGitHub Enterprise <[email protected]>2025-06-03 16:21:01 +0200
commita0b10b046095d57ffbdb46c83084601a832f4562 (patch)
treefe015645ea07d83c2784e3e28d0e976a37054859 /src/zen/cmds/builds_cmd.cpp
parentminor: fix unused variable warning on some compilers (diff)
downloadarchived-zen-a0b10b046095d57ffbdb46c83084601a832f4562.tar.xz
archived-zen-a0b10b046095d57ffbdb46c83084601a832f4562.zip
fixed size chunking for encrypted files (#410)
- Improvement: Use fixed size block chunking for know encrypted/compressed file types - Improvement: Skip trying to compress chunks that are sourced from files that are known to be encrypted/compressed - Improvement: Add global open file cache for written files increasing throughput during download by reducing overhead of open/close of file by 80%
Diffstat (limited to 'src/zen/cmds/builds_cmd.cpp')
-rw-r--r--src/zen/cmds/builds_cmd.cpp549
1 files changed, 286 insertions, 263 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index f6f15acb0..e13c90b4b 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -22,6 +22,7 @@
#include <zenhttp/httpclient.h>
#include <zenhttp/httpclientauth.h>
#include <zenhttp/httpcommon.h>
+#include <zenutil/bufferedwritefilecache.h>
#include <zenutil/buildstoragecache.h>
#include <zenutil/chunkblock.h>
#include <zenutil/chunkedcontent.h>
@@ -105,6 +106,40 @@ namespace {
}
WorkerThreadPool& GetNetworkPool() { return SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst); }
+ static const std::vector<uint32_t> NonCompressableExtensions({HashStringDjb2(".mp4"sv),
+ HashStringDjb2(".zip"sv),
+ HashStringDjb2(".7z"sv),
+ HashStringDjb2(".bzip"sv),
+ HashStringDjb2(".rar"sv),
+ HashStringDjb2(".gzip"sv),
+ HashStringDjb2(".apk"sv),
+ HashStringDjb2(".nsp"sv),
+ HashStringDjb2(".xvc"sv),
+ HashStringDjb2(".pkg"sv),
+ HashStringDjb2(".dmg"sv),
+ HashStringDjb2(".ipa"sv)});
+
+ static const tsl::robin_set<uint32_t> NonCompressableExtensionSet(NonCompressableExtensions.begin(), NonCompressableExtensions.end());
+
+ static bool IsExtensionHashCompressable(const uint32_t PathHash) { return !NonCompressableExtensionSet.contains(PathHash); }
+
+ static bool IsChunkCompressable(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, uint32_t ChunkIndex)
+ {
+ ZEN_UNUSED(Content);
+ const uint32_t ChunkLocationCount = Lookup.ChunkSequenceLocationCounts[ChunkIndex];
+ if (ChunkLocationCount == 0)
+ {
+ return false;
+ }
+ const size_t ChunkLocationOffset = Lookup.ChunkSequenceLocationOffset[ChunkIndex];
+ const uint32_t SequenceIndex = Lookup.ChunkSequenceLocations[ChunkLocationOffset].SequenceIndex;
+ const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex];
+ const uint32_t ExtensionHash = Lookup.PathExtensionHash[PathIndex];
+
+ const bool IsCompressable = IsExtensionHashCompressable(ExtensionHash);
+ return IsCompressable;
+ }
+
const uint64_t MinimumSizeForCompressInBlock = 2u * 1024u;
const std::string ZenFolderName = ".zen";
@@ -313,7 +348,10 @@ namespace {
std::atomic<uint64_t>& WriteByteCount)
{
BasicFile TargetFile(TargetFilePath, BasicFile::Mode::kTruncate);
- PrepareFileForScatteredWrite(TargetFile.Handle(), RawSize);
+ if (UseSparseFiles)
+ {
+ PrepareFileForScatteredWrite(TargetFile.Handle(), RawSize);
+ }
uint64_t Offset = 0;
if (!ScanFile(SourceFilePath, 512u * 1024u, [&](const void* Data, size_t Size) {
TargetFile.Write(Data, Size, Offset);
@@ -600,24 +638,6 @@ namespace {
return AuthToken;
}
- bool IsBufferDiskBased(const IoBuffer& Buffer)
- {
- IoBufferFileReference FileRef;
- if (Buffer.GetFileReference(FileRef))
- {
- return true;
- }
- return false;
- }
-
- bool IsBufferDiskBased(const CompositeBuffer& Buffer)
- {
- // If this is a file based buffer or a compressed buffer with a memory-based header, we don't need to rewrite to disk to save memory
- std::span<const SharedBuffer> Segments = Buffer.GetSegments();
- ZEN_ASSERT(Buffer.GetSegments().size() > 0);
- return IsBufferDiskBased(Segments.back().AsIoBuffer());
- }
-
IoBuffer WriteToTempFile(CompositeBuffer&& Buffer,
const std::filesystem::path& TempFolderPath,
const IoHash& Hash,
@@ -1729,16 +1749,13 @@ namespace {
{
ZEN_ASSERT(false);
}
- uint64_t RawSize = Chunk.GetSize();
- if (Lookup.RawHashToSequenceIndex.contains(ChunkHash) && RawSize >= MinimumSizeForCompressInBlock)
- {
- // Standalone chunk, not part of a sequence
- return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid)};
- }
- else
- {
- return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, OodleCompressionLevel::None)};
- }
+ uint64_t RawSize = Chunk.GetSize();
+ const bool ShouldCompressChunk = Lookup.RawHashToSequenceIndex.contains(ChunkHash) &&
+ (RawSize >= MinimumSizeForCompressInBlock) &&
+ IsChunkCompressable(Content, Lookup, ChunkIndex);
+ const OodleCompressionLevel CompressionLevel =
+ ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None;
+ return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, CompressionLevel)};
}));
}
@@ -1768,18 +1785,15 @@ namespace {
Content.ChunkedContent.ChunkRawSizes[ChunkIndex]);
ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash);
- const uint64_t RawSize = Chunk.GetSize();
- if (Lookup.RawHashToSequenceIndex.contains(ChunkHash) && RawSize >= MinimumSizeForCompressInBlock)
- {
- CompositeBuffer CompressedChunk = CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid).GetCompressed();
- ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end());
- }
- else
- {
- CompositeBuffer CompressedChunk =
- CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, OodleCompressionLevel::None).GetCompressed();
- ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end());
- }
+ const uint64_t RawSize = Chunk.GetSize();
+ const bool ShouldCompressChunk = Lookup.RawHashToSequenceIndex.contains(ChunkHash) &&
+ (RawSize >= MinimumSizeForCompressInBlock) && IsChunkCompressable(Content, Lookup, ChunkIndex);
+ const OodleCompressionLevel CompressionLevel =
+ ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None;
+
+ CompositeBuffer CompressedChunk =
+ CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, CompressionLevel).GetCompressed();
+ ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end());
}
return CompressedBuffer::FromCompressedNoValidate(CompositeBuffer(std::move(ResultBuffers)));
};
@@ -2253,7 +2267,11 @@ namespace {
{
throw std::runtime_error(fmt::format("Fetched chunk {} has invalid size", ChunkHash));
}
- ZEN_ASSERT_SLOW(IoHash::HashBuffer(RawSource) == ChunkHash);
+
+ const bool ShouldCompressChunk = IsChunkCompressable(Content, Lookup, ChunkIndex);
+ const OodleCompressionLevel CompressionLevel = ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None;
+
+ if (ShouldCompressChunk)
{
std::filesystem::path TempFilePath = TempFolderPath / ChunkHash.ToHexString();
@@ -2266,6 +2284,9 @@ namespace {
fmt::format("Failed creating temporary file for compressing blob {}. Reason: {}", ChunkHash, Ec.message()));
}
+ uint64_t StreamRawBytes = 0;
+ uint64_t StreamCompressedBytes = 0;
+
bool CouldCompress = CompressedBuffer::CompressToStream(
CompositeBuffer(SharedBuffer(RawSource)),
[&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
@@ -2273,7 +2294,11 @@ namespace {
LooseChunksStats.CompressedChunkRawBytes += SourceSize;
CompressedFile.Write(RangeBuffer, Offset);
LooseChunksStats.CompressedChunkBytes += RangeBuffer.GetSize();
- });
+ StreamRawBytes += SourceSize;
+ StreamCompressedBytes += RangeBuffer.GetSize();
+ },
+ OodleCompressor::Mermaid,
+ CompressionLevel);
if (CouldCompress)
{
uint64_t CompressedSize = CompressedFile.FileSize();
@@ -2296,22 +2321,36 @@ namespace {
return Compressed.GetCompressed();
}
+ else
+ {
+ LooseChunksStats.CompressedChunkRawBytes -= StreamRawBytes;
+ LooseChunksStats.CompressedChunkBytes -= StreamCompressedBytes;
+ }
CompressedFile.Close();
RemoveFile(TempFilePath, Ec);
ZEN_UNUSED(Ec);
}
- // Try regular compress - decompress may fail if compressed data is larger than non-compressed
- CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(RawSource)));
+ CompressedBuffer CompressedBlob =
+ CompressedBuffer::Compress(SharedBuffer(std::move(RawSource)), OodleCompressor::Mermaid, CompressionLevel);
if (!CompressedBlob)
{
throw std::runtime_error(fmt::format("Failed to compress large blob {}", ChunkHash));
}
- if (!IsBufferDiskBased(CompressedBlob.GetCompressed()))
+ ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawHash() == ChunkHash);
+ ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawSize() == ChunkSize);
+
+ LooseChunksStats.CompressedChunkRawBytes += ChunkSize;
+ LooseChunksStats.CompressedChunkBytes += CompressedBlob.GetCompressedSize();
+
+ // If we use none-compression, the compressed blob references the data and has 64 kb in memory so we don't need to write it to disk
+ if (ShouldCompressChunk)
{
IoBuffer TempPayload = WriteToTempFile(std::move(CompressedBlob).GetCompressed(), TempFolderPath, ChunkHash);
CompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload));
}
+
+ LooseChunksStats.CompressedChunkCount++;
return std::move(CompressedBlob).GetCompressed();
}
@@ -3431,7 +3470,8 @@ namespace {
LocalFolderScanStats.ElapsedWallTimeUS = ManifestParseTimer.GetElapsedTimeUs();
}
- std::unique_ptr<ChunkingController> ChunkController = CreateBasicChunkingController();
+ std::unique_ptr<ChunkingController> ChunkController =
+ CreateChunkingControllerWithFixedChunking(ChunkingControllerWithFixedChunkingSettings{});
{
CbObjectWriter ChunkParametersWriter;
ChunkParametersWriter.AddString("name"sv, ChunkController->GetName());
@@ -4399,106 +4439,6 @@ namespace {
}
}
- class WriteFileCache
- {
- public:
- WriteFileCache(DiskStatistics& DiskStats) : m_DiskStats(DiskStats) {}
- ~WriteFileCache() { Flush(); }
-
- template<typename TBufferType>
- void WriteToFile(uint32_t TargetIndex,
- std::function<std::filesystem::path(uint32_t TargetIndex)>&& GetTargetPath,
- const TBufferType& Buffer,
- uint64_t FileOffset,
- uint64_t TargetFinalSize)
- {
- ZEN_TRACE_CPU("WriteFileCache_WriteToFile");
- if (!SeenTargetIndexes.empty() && SeenTargetIndexes.back() == TargetIndex)
- {
- ZEN_TRACE_CPU("WriteFileCache_WriteToFile_CacheWrite");
- ZEN_ASSERT(OpenFileWriter);
- OpenFileWriter->Write(Buffer, FileOffset);
- m_DiskStats.WriteCount++;
- m_DiskStats.WriteByteCount += Buffer.GetSize();
- }
- else
- {
- std::unique_ptr<BasicFile> NewOutputFile;
- {
- ZEN_TRACE_CPU("WriteFileCache_WriteToFile_Open");
- Flush();
- const std::filesystem::path& TargetPath = GetTargetPath(TargetIndex);
- CreateDirectories(TargetPath.parent_path());
- uint32_t Tries = 5;
- NewOutputFile =
- std::make_unique<BasicFile>(TargetPath, BasicFile::Mode::kWrite, [&Tries, TargetPath](std::error_code& Ec) {
- if (Tries < 3)
- {
- ZEN_CONSOLE("Failed opening file '{}': {}{}", TargetPath, Ec.message(), Tries > 1 ? " Retrying"sv : ""sv);
- }
- if (Tries > 1)
- {
- Sleep(100);
- }
- return --Tries > 0;
- });
- m_DiskStats.OpenWriteCount++;
- m_DiskStats.CurrentOpenFileCount++;
- }
-
- const bool CacheWriter = TargetFinalSize > Buffer.GetSize();
- if (CacheWriter)
- {
- ZEN_TRACE_CPU("WriteFileCache_WriteToFile_CacheWrite");
- ZEN_ASSERT_SLOW(std::find(SeenTargetIndexes.begin(), SeenTargetIndexes.end(), TargetIndex) == SeenTargetIndexes.end());
-
- OutputFile = std::move(NewOutputFile);
- if (UseSparseFiles)
- {
- void* Handle = OutputFile->Handle();
- if (!PrepareFileForScatteredWrite(Handle, TargetFinalSize))
- {
- ZEN_DEBUG("Unable to to prepare file '{}' with size {} for random write",
- GetTargetPath(TargetIndex),
- TargetFinalSize);
- }
- }
- OpenFileWriter = std::make_unique<BasicFileWriter>(*OutputFile, Min(TargetFinalSize, 256u * 1024u));
-
- OpenFileWriter->Write(Buffer, FileOffset);
- m_DiskStats.WriteCount++;
- m_DiskStats.WriteByteCount += Buffer.GetSize();
- SeenTargetIndexes.push_back(TargetIndex);
- }
- else
- {
- ZEN_TRACE_CPU("WriteFileCache_WriteToFile_Write");
- NewOutputFile->Write(Buffer, FileOffset);
- m_DiskStats.WriteCount++;
- m_DiskStats.WriteByteCount += Buffer.GetSize();
- NewOutputFile = {};
- m_DiskStats.CurrentOpenFileCount--;
- }
- }
- }
-
- void Flush()
- {
- ZEN_TRACE_CPU("WriteFileCache_Flush");
- if (OutputFile)
- {
- m_DiskStats.CurrentOpenFileCount--;
- }
-
- OpenFileWriter = {};
- OutputFile = {};
- }
- DiskStatistics& m_DiskStats;
- std::vector<uint32_t> SeenTargetIndexes;
- std::unique_ptr<BasicFile> OutputFile;
- std::unique_ptr<BasicFileWriter> OpenFileWriter;
- };
-
std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> GetRemainingChunkTargets(
std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
const ChunkedContentLookup& Lookup,
@@ -4581,7 +4521,7 @@ namespace {
VerifySize,
ExpectedSize));
}
- ZEN_TRACE_CPU("HashSequence");
+
const IoHash VerifyChunkHash = IoHash::HashBuffer(std::move(VerifyBuffer));
if (VerifyChunkHash != SequenceRawHash)
{
@@ -4651,6 +4591,77 @@ namespace {
return CompletedSequenceIndexes;
}
+ void WriteSequenceChunk(const std::filesystem::path& TargetFolderPath,
+ const ChunkedFolderContent& RemoteContent,
+ BufferedWriteFileCache::Local& LocalWriter,
+ const CompositeBuffer& Chunk,
+ const uint32_t SequenceIndex,
+ const uint64_t FileOffset,
+ const uint32_t PathIndex,
+ DiskStatistics& DiskStats)
+ {
+ ZEN_TRACE_CPU("WriteSequenceChunk");
+
+ const uint64_t SequenceSize = RemoteContent.RawSizes[PathIndex];
+
+ auto OpenFile = [&](BasicFile& File) {
+ const std::filesystem::path FileName =
+ GetTempChunkedSequenceFileName(TargetFolderPath, RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
+ File.Open(FileName, BasicFile::Mode::kWrite);
+ if (UseSparseFiles)
+ {
+ PrepareFileForScatteredWrite(File.Handle(), SequenceSize);
+ }
+ };
+
+ const uint64_t ChunkSize = Chunk.GetSize();
+ ZEN_ASSERT(FileOffset + ChunkSize <= SequenceSize);
+ if (ChunkSize == SequenceSize)
+ {
+ BasicFile SingleChunkFile;
+ OpenFile(SingleChunkFile);
+
+ DiskStats.CurrentOpenFileCount++;
+ auto _ = MakeGuard([&DiskStats]() { DiskStats.CurrentOpenFileCount--; });
+ SingleChunkFile.Write(Chunk, FileOffset);
+
+ DiskStats.WriteCount++;
+ DiskStats.WriteByteCount += ChunkSize;
+ }
+ else
+ {
+ const uint64_t MaxWriterBufferSize = 256u * 1025u;
+
+ BufferedWriteFileCache::Local::Writer* Writer = LocalWriter.GetWriter(SequenceIndex);
+ if (Writer)
+ {
+ if ((!Writer->Writer) && (ChunkSize < MaxWriterBufferSize))
+ {
+ Writer->Writer = std::make_unique<BasicFileWriter>(*Writer->File, Min(SequenceSize, MaxWriterBufferSize));
+ }
+ Writer->Write(Chunk, FileOffset);
+
+ DiskStats.WriteCount++;
+ DiskStats.WriteByteCount += ChunkSize;
+ }
+ else
+ {
+ Writer = LocalWriter.PutWriter(SequenceIndex, std::make_unique<BufferedWriteFileCache::Local::Writer>());
+
+ Writer->File = std::make_unique<BasicFile>();
+ OpenFile(*Writer->File);
+ if (ChunkSize < MaxWriterBufferSize)
+ {
+ Writer->Writer = std::make_unique<BasicFileWriter>(*Writer->File, Min(SequenceSize, MaxWriterBufferSize));
+ }
+ Writer->Write(Chunk, FileOffset);
+
+ DiskStats.WriteCount++;
+ DiskStats.WriteByteCount += ChunkSize;
+ }
+ }
+ }
+
struct BlockWriteOps
{
std::vector<CompositeBuffer> ChunkBuffers;
@@ -4667,13 +4678,15 @@ namespace {
const ChunkedContentLookup& Lookup,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
const BlockWriteOps& Ops,
+ BufferedWriteFileCache& WriteCache,
ParallelWork& Work,
WorkerThreadPool& VerifyPool,
DiskStatistics& DiskStats)
{
ZEN_TRACE_CPU("WriteBlockChunkOps");
+
{
- WriteFileCache OpenFileCache(DiskStats);
+ BufferedWriteFileCache::Local LocalWriter(WriteCache);
for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps)
{
if (AbortFlag)
@@ -4685,25 +4698,15 @@ namespace {
ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <=
RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]);
ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0);
- const uint64_t ChunkSize = Chunk.GetSize();
const uint64_t FileOffset = WriteOp.Target->Offset;
const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex];
- ZEN_ASSERT(FileOffset + ChunkSize <= RemoteContent.RawSizes[PathIndex]);
- OpenFileCache.WriteToFile<CompositeBuffer>(
- SequenceIndex,
- [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
- return GetTempChunkedSequenceFileName(CacheFolderPath,
- RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
- },
- Chunk,
- FileOffset,
- RemoteContent.RawSizes[PathIndex]);
+ WriteSequenceChunk(CacheFolderPath, RemoteContent, LocalWriter, Chunk, SequenceIndex, FileOffset, PathIndex, DiskStats);
}
}
if (!AbortFlag)
{
- // Write tracking, updating this must be done without any files open (WriteFileCache)
+ // Write tracking, updating this must be done without any files open (BufferedWriteFileCache::Local)
std::vector<uint32_t> CompletedChunkSequences;
for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps)
{
@@ -4713,6 +4716,7 @@ namespace {
CompletedChunkSequences.push_back(RemoteSequenceIndex);
}
}
+ WriteCache.Close(CompletedChunkSequences);
VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, Lookup, CompletedChunkSequences, Work, VerifyPool);
}
}
@@ -4864,6 +4868,7 @@ namespace {
CompositeBuffer&& BlockBuffer,
const ChunkedContentLookup& Lookup,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
+ BufferedWriteFileCache& WriteCache,
DiskStatistics& DiskStats)
{
ZEN_TRACE_CPU("WriteBlockToDisk");
@@ -4896,6 +4901,7 @@ namespace {
Lookup,
SequenceIndexChunksLeftToWriteCounters,
Ops,
+ WriteCache,
Work,
VerifyPool,
DiskStats);
@@ -4920,6 +4926,7 @@ namespace {
Lookup,
SequenceIndexChunksLeftToWriteCounters,
Ops,
+ WriteCache,
Work,
VerifyPool,
DiskStats);
@@ -4939,6 +4946,7 @@ namespace {
uint32_t LastIncludedBlockChunkIndex,
const ChunkedContentLookup& Lookup,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
+ BufferedWriteFileCache& WriteCache,
DiskStatistics& DiskStats)
{
ZEN_TRACE_CPU("WritePartialBlockToDisk");
@@ -4963,6 +4971,7 @@ namespace {
Lookup,
SequenceIndexChunksLeftToWriteCounters,
Ops,
+ WriteCache,
Work,
VerifyPool,
DiskStats);
@@ -4974,75 +4983,15 @@ namespace {
}
}
- SharedBuffer Decompress(CompositeBuffer&& CompressedChunk, const IoHash& ChunkHash, const uint64_t ChunkRawSize)
- {
- ZEN_TRACE_CPU("Decompress");
-
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedChunk, RawHash, RawSize);
- if (!Compressed)
- {
- throw std::runtime_error(fmt::format("Invalid build blob format for chunk {}", ChunkHash));
- }
- if (RawHash != ChunkHash)
- {
- throw std::runtime_error(fmt::format("Mismatching build blob {}, but compressed header rawhash is {}", ChunkHash, RawHash));
- }
- if (RawSize != ChunkRawSize)
- {
- throw std::runtime_error(
- fmt::format("Mismatching build blob {}, expected raw size {} but recevied raw size {}", ChunkHash, ChunkRawSize, RawSize));
- }
- if (!Compressed)
- {
- throw std::runtime_error(fmt::format("Invalid build blob {}, not a compressed buffer", ChunkHash));
- }
-
- SharedBuffer Decompressed = Compressed.Decompress();
-
- if (!Decompressed)
- {
- throw std::runtime_error(fmt::format("Decompression of build blob {} failed", ChunkHash));
- }
- return Decompressed;
- }
-
- void WriteChunkToDisk(const std::filesystem::path& CacheFolderPath,
- const ChunkedFolderContent& Content,
- const ChunkedContentLookup& Lookup,
- std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> ChunkTargets,
- CompositeBuffer&& ChunkData,
- WriteFileCache& OpenFileCache)
- {
- ZEN_TRACE_CPU("WriteChunkToDisk");
-
- for (const ChunkedContentLookup::ChunkSequenceLocation* TargetPtr : ChunkTargets)
- {
- const auto& Target = *TargetPtr;
- const uint64_t FileOffset = Target.Offset;
- const uint32_t SequenceIndex = Target.SequenceIndex;
- const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex];
-
- OpenFileCache.WriteToFile(
- SequenceIndex,
- [&CacheFolderPath, &Content](uint32_t SequenceIndex) {
- return GetTempChunkedSequenceFileName(CacheFolderPath, Content.ChunkedContent.SequenceRawHashes[SequenceIndex]);
- },
- ChunkData,
- FileOffset,
- Content.RawSizes[PathIndex]);
- }
- }
-
- bool CanDecompressDirectToSequence(const ChunkedFolderContent& RemoteContent,
- const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations)
+ bool IsSingleFileChunk(const ChunkedFolderContent& RemoteContent,
+ const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations)
{
if (Locations.size() == 1)
{
const uint32_t FirstSequenceIndex = Locations[0]->SequenceIndex;
- if (Locations[0]->Offset == 0 && RemoteContent.ChunkedContent.ChunkCounts[FirstSequenceIndex] == 1)
+ if (RemoteContent.ChunkedContent.ChunkCounts[FirstSequenceIndex] == 1)
{
+ ZEN_ASSERT_SLOW(Locations[0]->Offset == 0);
return true;
}
}
@@ -5087,13 +5036,14 @@ namespace {
DiskStats.ReadByteCount += SourceSize;
if (!AbortFlag)
{
- DecompressedTemp.Write(RangeBuffer, Offset);
for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
{
Hash.Append(Segment.GetView());
+ DecompressedTemp.Write(Segment, Offset);
+ Offset += Segment.GetSize();
+ DiskStats.WriteByteCount += Segment.GetSize();
+ DiskStats.WriteCount++;
}
- DiskStats.WriteByteCount += RangeBuffer.GetSize();
- DiskStats.WriteCount++;
return true;
}
return false;
@@ -5128,37 +5078,80 @@ namespace {
const ChunkedContentLookup& RemoteLookup,
const IoHash& ChunkHash,
const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
+ BufferedWriteFileCache& WriteCache,
IoBuffer&& CompressedPart,
DiskStatistics& DiskStats)
{
ZEN_TRACE_CPU("WriteCompressedChunk");
auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end());
- const uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second;
- const uint64_t ChunkRawSize = RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex];
- if (CanDecompressDirectToSequence(RemoteContent, ChunkTargetPtrs))
+ if (IsSingleFileChunk(RemoteContent, ChunkTargetPtrs))
{
const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex;
const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex];
StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), DiskStats);
+ return false;
}
else
{
- SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)), ChunkHash, ChunkRawSize);
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompositeBuffer(std::move(CompressedPart)), RawHash, RawSize);
+ if (!Compressed)
+ {
+ throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", ChunkHash));
+ }
+ if (RawHash != ChunkHash)
+ {
+ throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, ChunkHash));
+ }
- if (!AbortFlag)
+ BufferedWriteFileCache::Local LocalWriter(WriteCache);
+
+ IoHashStream Hash;
+ bool CouldDecompress = Compressed.DecompressToStream(
+ 0,
+ (uint64_t)-1,
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset);
+ ZEN_TRACE_CPU("StreamDecompress_Write");
+ DiskStats.ReadByteCount += SourceSize;
+ if (!AbortFlag)
+ {
+ for (const ChunkedContentLookup::ChunkSequenceLocation* TargetPtr : ChunkTargetPtrs)
+ {
+ const auto& Target = *TargetPtr;
+ const uint64_t FileOffset = Target.Offset + Offset;
+ const uint32_t SequenceIndex = Target.SequenceIndex;
+ const uint32_t PathIndex = RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex];
+
+ WriteSequenceChunk(TargetFolder,
+ RemoteContent,
+ LocalWriter,
+ RangeBuffer,
+ SequenceIndex,
+ FileOffset,
+ PathIndex,
+ DiskStats);
+ }
+
+ return true;
+ }
+ return false;
+ });
+
+ if (AbortFlag)
{
- WriteFileCache OpenFileCache(DiskStats);
- WriteChunkToDisk(TargetFolder,
- RemoteContent,
- RemoteLookup,
- ChunkTargetPtrs,
- CompositeBuffer(std::move(Chunk)),
- OpenFileCache);
- return true;
+ return false;
+ }
+
+ if (!CouldDecompress)
+ {
+ throw std::runtime_error(fmt::format("Failed to decompress large chunk {}", ChunkHash));
}
+
+ return true;
}
- return false;
}
void AsyncWriteDownloadedChunk(const std::filesystem::path& ZenFolderPath,
@@ -5166,6 +5159,7 @@ namespace {
const ChunkedContentLookup& RemoteLookup,
uint32_t RemoteChunkIndex,
std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs,
+ BufferedWriteFileCache& WriteCache,
ParallelWork& Work,
WorkerThreadPool& WritePool,
IoBuffer&& Payload,
@@ -5230,6 +5224,7 @@ namespace {
CompressedChunkPath,
RemoteChunkIndex,
TotalPartWriteCount,
+ &WriteCache,
&DiskStats,
&WritePartsComplete,
&FilteredWrittenBytesPerSecond,
@@ -5264,6 +5259,7 @@ namespace {
RemoteLookup,
ChunkHash,
ChunkTargetPtrs,
+ WriteCache,
std::move(CompressedPart),
DiskStats);
if (!AbortFlag)
@@ -5281,6 +5277,7 @@ namespace {
std::vector<uint32_t> CompletedSequences =
CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
+ WriteCache.Close(CompletedSequences);
if (NeedHashVerify)
{
VerifyAndCompleteChunkSequencesAsync(TargetFolder,
@@ -6387,6 +6384,8 @@ namespace {
}
}
+ BufferedWriteFileCache WriteCache;
+
for (uint32_t ScavengeOpIndex = 0; ScavengeOpIndex < ScavengeCopyOperations.size(); ScavengeOpIndex++)
{
if (AbortFlag)
@@ -6487,6 +6486,7 @@ namespace {
PreferredMultipartChunkSize,
TotalRequestCount,
TotalPartWriteCount,
+ &WriteCache,
&FilteredDownloadedBytesPerSecond,
&FilteredWrittenBytesPerSecond](std::atomic<bool>&) mutable {
if (!AbortFlag)
@@ -6535,6 +6535,7 @@ namespace {
&RemoteLookup,
&CacheFolderPath,
&SequenceIndexChunksLeftToWriteCounters,
+ &WriteCache,
&Work,
&WritePool,
&DiskStats,
@@ -6568,6 +6569,7 @@ namespace {
RemoteLookup,
ChunkHash,
ChunkTargetPtrs,
+ WriteCache,
std::move(CompressedPart),
DiskStats);
WritePartsComplete++;
@@ -6583,6 +6585,7 @@ namespace {
std::vector<uint32_t> CompletedSequences =
CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
+ WriteCache.Close(CompletedSequences);
if (NeedHashVerify)
{
VerifyAndCompleteChunkSequencesAsync(TargetFolder,
@@ -6608,11 +6611,12 @@ namespace {
&ZenFolderPath,
&Storage,
BuildId = Oid(BuildId),
- &PrimeCacheOnly,
+ PrimeCacheOnly,
&RemoteContent,
&RemoteLookup,
&ExistsResult,
&SequenceIndexChunksLeftToWriteCounters,
+ &WriteCache,
&Work,
&WritePool,
&NetworkPool,
@@ -6654,6 +6658,7 @@ namespace {
RemoteLookup,
RemoteChunkIndex,
std::move(ChunkTargetPtrs),
+ WriteCache,
Work,
WritePool,
std::move(BuildBlob),
@@ -6683,6 +6688,7 @@ namespace {
BuildId,
PrimeCacheOnly,
&SequenceIndexChunksLeftToWriteCounters,
+ &WriteCache,
&Work,
&WritePool,
ChunkHash,
@@ -6718,6 +6724,7 @@ namespace {
RemoteLookup,
RemoteChunkIndex,
std::move(ChunkTargetPtrs),
+ WriteCache,
Work,
WritePool,
std::move(Payload),
@@ -6763,6 +6770,7 @@ namespace {
RemoteLookup,
RemoteChunkIndex,
std::move(ChunkTargetPtrs),
+ WriteCache,
Work,
WritePool,
std::move(BuildBlob),
@@ -6800,6 +6808,7 @@ namespace {
&CacheFolderPath,
&LocalLookup,
&SequenceIndexChunksLeftToWriteCounters,
+ &WriteCache,
&Work,
&WritePool,
&FilteredWrittenBytesPerSecond,
@@ -6892,8 +6901,9 @@ namespace {
tsl::robin_set<uint32_t> ChunkIndexesWritten;
- BufferedOpenFile SourceFile(SourceFilePath, DiskStats);
- WriteFileCache OpenFileCache(DiskStats);
+ BufferedOpenFile SourceFile(SourceFilePath, DiskStats);
+ BufferedWriteFileCache::Local LocalWriter(WriteCache);
+
for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();)
{
if (AbortFlag)
@@ -6940,18 +6950,17 @@ namespace {
}
CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ReadLength);
- ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]);
-
- OpenFileCache.WriteToFile<CompositeBuffer>(
- RemoteSequenceIndex,
- [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
- return GetTempChunkedSequenceFileName(
- CacheFolderPath,
- RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
- },
- ChunkSource,
- Op.Target->Offset,
- RemoteContent.RawSizes[RemotePathIndex]);
+
+ const uint64_t FileOffset = Op.Target->Offset;
+
+ WriteSequenceChunk(CacheFolderPath,
+ RemoteContent,
+ LocalWriter,
+ ChunkSource,
+ RemoteSequenceIndex,
+ FileOffset,
+ RemotePathIndex,
+ DiskStats);
CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes?
@@ -6960,7 +6969,7 @@ namespace {
}
if (!AbortFlag)
{
- // Write tracking, updating this must be done without any files open (WriteFileCache)
+ // Write tracking, updating this must be done without any files open (BufferedWriteFileCache::Local)
std::vector<uint32_t> CompletedChunkSequences;
for (const WriteOp& Op : WriteOps)
{
@@ -6970,6 +6979,7 @@ namespace {
CompletedChunkSequences.push_back(RemoteSequenceIndex);
}
}
+ WriteCache.Close(CompletedChunkSequences);
VerifyAndCompleteChunkSequencesAsync(CacheFolderPath,
RemoteContent,
RemoteLookup,
@@ -7003,6 +7013,7 @@ namespace {
&RemoteLookup,
&RemoteChunkIndexNeedsCopyFromSourceFlags,
&SequenceIndexChunksLeftToWriteCounters,
+ &WriteCache,
&Work,
&WritePool,
&BlockDescriptions,
@@ -7038,6 +7049,7 @@ namespace {
CompositeBuffer(std::move(BlockBuffer)),
RemoteLookup,
RemoteChunkIndexNeedsCopyFromSourceFlags,
+ WriteCache,
DiskStats))
{
std::error_code DummyEc;
@@ -7080,6 +7092,7 @@ namespace {
&RemoteContent,
&SequenceIndexChunksLeftToWriteCounters,
&ExistsResult,
+ &WriteCache,
&FilteredDownloadedBytesPerSecond,
TotalRequestCount,
&WritePartsComplete,
@@ -7185,6 +7198,7 @@ namespace {
&RemoteChunkIndexNeedsCopyFromSourceFlags,
&SequenceIndexChunksLeftToWriteCounters,
&WritePartsComplete,
+ &WriteCache,
&Work,
TotalPartWriteCount,
&WritePool,
@@ -7230,6 +7244,7 @@ namespace {
BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
RemoteLookup,
RemoteChunkIndexNeedsCopyFromSourceFlags,
+ WriteCache,
DiskStats))
{
std::error_code DummyEc;
@@ -7284,6 +7299,7 @@ namespace {
&WritePool,
&RemoteContent,
&RemoteLookup,
+ &WriteCache,
&CacheFolderPath,
&RemoteChunkIndexNeedsCopyFromSourceFlags,
&SequenceIndexChunksLeftToWriteCounters,
@@ -7388,6 +7404,7 @@ namespace {
&SequenceIndexChunksLeftToWriteCounters,
BlockIndex,
&BlockDescriptions,
+ &WriteCache,
&WriteChunkStats,
&DiskStats,
&WritePartsComplete,
@@ -7429,6 +7446,7 @@ namespace {
CompositeBuffer(std::move(BlockBuffer)),
RemoteLookup,
RemoteChunkIndexNeedsCopyFromSourceFlags,
+ WriteCache,
DiskStats))
{
std::error_code DummyEc;
@@ -7753,6 +7771,7 @@ namespace {
Work.ScheduleWork(WritePool, [&Path, &LocalContent, &DeletedCount, LocalPathIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
+ ZEN_TRACE_CPU("FinalizeTree_Remove");
const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
SetFileReadOnlyWithRetry(LocalFilePath, false);
RemoveFileWithRetry(LocalFilePath);
@@ -8815,7 +8834,7 @@ namespace {
if (!ChunkController)
{
ZEN_CONSOLE("Warning: Unspecified chunking algorith, using default");
- ChunkController = CreateBasicChunkingController();
+ ChunkController = CreateChunkingControllerWithFixedChunking(ChunkingControllerWithFixedChunkingSettings{});
}
LocalContent = GetLocalContent(LocalFolderScanStats,
@@ -8899,7 +8918,10 @@ namespace {
{
BuildPartString.Append(fmt::format(" {} ({})", BuildPart.second, BuildPart.first));
}
- ZEN_CONSOLE("Downloading build {}, parts:{} to '{}'", BuildId, BuildPartString.ToView(), Path);
+
+ uint64_t RawSize = std::accumulate(RemoteContent.RawSizes.begin(), RemoteContent.RawSizes.end(), std::uint64_t(0));
+
+ ZEN_CONSOLE("Downloading build {}, parts:{} to '{}' ({})", BuildId, BuildPartString.ToView(), Path, NiceBytes(RawSize));
FolderContent LocalFolderState;
DiskStatistics DiskStats;
@@ -9037,8 +9059,9 @@ namespace {
ChunkedFolderContent CompareFolderContent;
{
- std::unique_ptr<ChunkingController> ChunkController = CreateBasicChunkingController();
- std::vector<std::string_view> ExcludeExtensions = DefaultExcludeExtensions;
+ std::unique_ptr<ChunkingController> ChunkController =
+ CreateChunkingControllerWithFixedChunking(ChunkingControllerWithFixedChunkingSettings{});
+ std::vector<std::string_view> ExcludeExtensions = DefaultExcludeExtensions;
if (OnlyChunked)
{
ExcludeExtensions.insert(ExcludeExtensions.end(),
@@ -10717,7 +10740,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
ExtendableStringBuilder<256> SB;
CompactBinaryToJson(MetaData, SB);
- ZEN_CONSOLE("Upload Build {}, Part {} ({})\n{}", m_BuildId, BuildPartId, m_BuildPartName, SB.ToView());
+ ZEN_CONSOLE("Upload Build {}, Part {} ({}) from '{}'\n{}", m_BuildId, BuildPartId, m_BuildPartName, m_Path, SB.ToView());
}
const std::filesystem::path UploadTempDir = UploadTempDirectory(m_Path);