diff options
| author | Dan Engelbrecht <[email protected]> | 2025-03-01 10:10:53 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-03-01 10:10:53 +0100 |
| commit | 19b3c492dcc0fc3f8879ecb60124ca64dea9b7ef (patch) | |
| tree | c0aa4e89d4a4eaf9d202898a37a3303182907c22 /src | |
| parent | improve error handling (#289) (diff) | |
| download | zen-19b3c492dcc0fc3f8879ecb60124ca64dea9b7ef.tar.xz zen-19b3c492dcc0fc3f8879ecb60124ca64dea9b7ef.zip | |
builds download incremental (#290)
* incremental download
* merge rebuild state and output state building
* fix writing when > 1 zero size file
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 1235 | ||||
| -rw-r--r-- | src/zencore/filesystem.cpp | 7 | ||||
| -rw-r--r-- | src/zencore/include/zencore/filesystem.h | 10 | ||||
| -rw-r--r-- | src/zenutil/filebuildstorage.cpp | 12 |
4 files changed, 635 insertions, 629 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 18cc7cf9e..28c794559 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -82,7 +82,7 @@ namespace { const std::string ZenStateFilePath = fmt::format("{}/current_state.cbo", ZenFolderName); const std::string ZenStateFileJsonPath = fmt::format("{}/current_state.json", ZenFolderName); const std::string ZenTempFolderName = fmt::format("{}/tmp", ZenFolderName); - const std::string ZenTempReuseFolderName = fmt::format("{}/reuse", ZenTempFolderName); + const std::string ZenTempCacheFolderName = fmt::format("{}/cache", ZenTempFolderName); const std::string ZenTempStorageFolderName = fmt::format("{}/storage", ZenTempFolderName); const std::string ZenTempBlockFolderName = fmt::format("{}/blocks", ZenTempFolderName); const std::string ZenTempChunkFolderName = fmt::format("{}/chunks", ZenTempFolderName); @@ -115,6 +115,54 @@ namespace { ); + uint32_t SetNativeFileAttributes(const std::filesystem::path FilePath, SourcePlatform SourcePlatform, uint32_t Attributes) + { +#if ZEN_PLATFORM_WINDOWS + if (SourcePlatform == SourcePlatform::Windows) + { + SetFileAttributes(FilePath, Attributes); + return Attributes; + } + else + { + uint32_t CurrentAttributes = GetFileAttributes(FilePath); + uint32_t NewAttributes = MakeFileAttributeReadOnly(CurrentAttributes, IsFileModeReadOnly(Attributes)); + if (CurrentAttributes != NewAttributes) + { + SetFileAttributes(FilePath, NewAttributes); + } + return NewAttributes; + } +#endif // ZEN_PLATFORM_WINDOWS +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + if (SourcePlatform != SourcePlatform::Windows) + { + SetFileMode(FilePath, Attributes); + return Attributes; + } + else + { + uint32_t CurrentMode = GetFileMode(FilePath); + uint32_t NewMode = MakeFileModeReadOnly(CurrentMode, IsFileAttributeReadOnly(Attributes)); + if (CurrentMode != NewMode) + { + SetFileMode(FilePath, NewMode); + } + return NewMode; + } +#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + }; + + uint32_t GetNativeFileAttributes(const std::filesystem::path FilePath) + { +#if ZEN_PLATFORM_WINDOWS + return GetFileAttributes(FilePath); +#endif // ZEN_PLATFORM_WINDOWS +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + return GetFileMode(FilePath); +#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + } + template<typename T> std::string FormatArray(std::span<const T> Items, std::string_view Prefix) { @@ -181,9 +229,8 @@ namespace { // If this is a file based buffer or a compressed buffer with a memory-based header, we don't need to rewrite to disk to save memory std::span<const SharedBuffer> Segments = Buffer.GetSegments(); ZEN_ASSERT(Buffer.GetSegments().size() > 0); - size_t SegmentIndexToCheck = Segments.size() > 1 ? 1 : 0; IoBufferFileReference FileRef; - if (Segments[SegmentIndexToCheck].GetFileReference(FileRef)) + if (Segments.back().GetFileReference(FileRef)) { return Buffer; } @@ -2114,7 +2161,7 @@ namespace { Stopwatch PutBuildTimer; CbObject PutBuildResult = Storage.PutBuild(BuildId, MetaData); ZEN_CONSOLE("PutBuild took {}. Payload size: {}", - NiceLatencyNs(PutBuildTimer.GetElapsedTimeUs() * 1000), + NiceTimeSpanMs(PutBuildTimer.GetElapsedTimeMs()), NiceBytes(MetaData.GetSize())); PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(PreferredMultipartChunkSize); } @@ -2122,9 +2169,7 @@ namespace { { Stopwatch GetBuildTimer; CbObject Build = Storage.GetBuild(BuildId); - ZEN_CONSOLE("GetBuild took {}. Payload size: {}", - NiceLatencyNs(GetBuildTimer.GetElapsedTimeUs() * 1000), - NiceBytes(Build.GetSize())); + ZEN_CONSOLE("GetBuild took {}. Payload size: {}", NiceTimeSpanMs(GetBuildTimer.GetElapsedTimeMs()), NiceBytes(Build.GetSize())); if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) { PreferredMultipartChunkSize = ChunkSize; @@ -2439,7 +2484,7 @@ namespace { Stopwatch PutBuildPartResultTimer; std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult = Storage.PutBuildPart(BuildId, BuildPartId, BuildPartName, PartManifest); ZEN_CONSOLE("PutBuildPart took {}, payload size {}. {} attachments are missing.", - NiceLatencyNs(PutBuildPartResultTimer.GetElapsedTimeUs() * 1000), + NiceTimeSpanMs(PutBuildPartResultTimer.GetElapsedTimeMs()), NiceBytes(PartManifest.GetSize()), PutBuildPartResult.second.size()); IoHash PartHash = PutBuildPartResult.first; @@ -2531,7 +2576,7 @@ namespace { Stopwatch FinalizeBuildPartTimer; std::vector<IoHash> Needs = Storage.FinalizeBuildPart(BuildId, BuildPartId, PartHash); ZEN_CONSOLE("FinalizeBuildPart took {}. {} attachments are missing.", - NiceLatencyNs(FinalizeBuildPartTimer.GetElapsedTimeUs() * 1000), + NiceTimeSpanMs(FinalizeBuildPartTimer.GetElapsedTimeMs()), Needs.size()); if (Needs.empty()) { @@ -2545,7 +2590,7 @@ namespace { { Stopwatch FinalizeBuildTimer; Storage.FinalizeBuild(BuildId); - ZEN_CONSOLE("FinalizeBuild took {}", NiceLatencyNs(FinalizeBuildTimer.GetElapsedTimeUs() * 1000)); + ZEN_CONSOLE("FinalizeBuild took {}", NiceTimeSpanMs(FinalizeBuildTimer.GetElapsedTimeMs())); } if (!NewBlocks.BlockDescriptions.empty()) @@ -2949,7 +2994,7 @@ namespace { const bool CacheWriter = TargetFinalSize > Buffer.GetSize(); if (CacheWriter) { - ZEN_ASSERT(std::find(SeenTargetIndexes.begin(), SeenTargetIndexes.end(), TargetIndex) == SeenTargetIndexes.end()); + ZEN_ASSERT_SLOW(std::find(SeenTargetIndexes.begin(), SeenTargetIndexes.end(), TargetIndex) == SeenTargetIndexes.end()); OutputFile = std::move(NewOutputFile); OpenFileWriter = std::make_unique<BasicFileWriter>(*OutputFile, Min(TargetFinalSize, 256u * 1024u)); @@ -2994,7 +3039,7 @@ namespace { return ChunkTargetPtrs; }; - bool WriteBlockToDisk(const std::filesystem::path& Path, + bool WriteBlockToDisk(const std::filesystem::path& CacheFolderPath, const ChunkedFolderContent& Content, const std::vector<bool>& RemotePathIndexWantsCopyFromCacheFlags, const CompositeBuffer& DecompressedBlockBuffer, @@ -3061,22 +3106,30 @@ namespace { return Lhs.Target->Offset < Rhs.Target->Offset; }); - WriteFileCache OpenFileCache; - for (const WriteOpData& WriteOp : WriteOps) { - const CompositeBuffer& Chunk = ChunkBuffers[WriteOp.ChunkBufferIndex]; - const uint32_t PathIndex = WriteOp.Target->PathIndex; - const uint64_t ChunkSize = Chunk.GetSize(); - const uint64_t FileOffset = WriteOp.Target->Offset; - ZEN_ASSERT(FileOffset + ChunkSize <= Content.RawSizes[PathIndex]); - - OpenFileCache.WriteToFile<CompositeBuffer>( - PathIndex, - [&Path, &Content](uint32_t TargetIndex) { return (Path / Content.Paths[TargetIndex]).make_preferred(); }, - Chunk, - FileOffset, - Content.RawSizes[PathIndex]); - OutBytesWritten += ChunkSize; + WriteFileCache OpenFileCache; + for (const WriteOpData& WriteOp : WriteOps) + { + if (AbortFlag) + { + break; + } + const CompositeBuffer& Chunk = ChunkBuffers[WriteOp.ChunkBufferIndex]; + const uint32_t PathIndex = WriteOp.Target->PathIndex; + const uint64_t ChunkSize = Chunk.GetSize(); + const uint64_t FileOffset = WriteOp.Target->Offset; + ZEN_ASSERT(FileOffset + ChunkSize <= Content.RawSizes[PathIndex]); + + OpenFileCache.WriteToFile<CompositeBuffer>( + PathIndex, + [&CacheFolderPath, &Content](uint32_t TargetIndex) { + return (CacheFolderPath / Content.RawHashes[TargetIndex].ToHexString()).make_preferred(); + }, + Chunk, + FileOffset, + Content.RawSizes[PathIndex]); + OutBytesWritten += ChunkSize; + } } OutChunksComplete += gsl::narrow<uint32_t>(ChunkBuffers.size()); } @@ -3086,11 +3139,11 @@ namespace { return false; } - SharedBuffer Decompress(const IoBuffer& CompressedChunk, const IoHash& ChunkHash, const uint64_t ChunkRawSize) + SharedBuffer Decompress(const CompositeBuffer& CompressedChunk, const IoHash& ChunkHash, const uint64_t ChunkRawSize) { IoHash RawHash; uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(CompressedChunk), RawHash, RawSize); + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedChunk, RawHash, RawSize); if (!Compressed) { throw std::runtime_error(fmt::format("Invalid build blob format for chunk {}", ChunkHash)); @@ -3118,7 +3171,7 @@ namespace { return Decompressed; } - void WriteChunkToDisk(const std::filesystem::path& Path, + void WriteChunkToDisk(const std::filesystem::path& CacheFolderPath, const ChunkedFolderContent& Content, std::span<const ChunkedContentLookup::ChunkLocation* const> ChunkTargets, const CompositeBuffer& ChunkData, @@ -3132,7 +3185,10 @@ namespace { OpenFileCache.WriteToFile( Target.PathIndex, - [&Path, &Content](uint32_t TargetIndex) { return (Path / Content.Paths[TargetIndex]).make_preferred(); }, + [&CacheFolderPath, &Content](uint32_t TargetIndex) { + return (CacheFolderPath / Content.RawHashes[TargetIndex].ToHexString()).make_preferred(); + // return (Path / Content.Paths[TargetIndex]).make_preferred(); + }, ChunkData, FileOffset, Content.RawSizes[Target.PathIndex]); @@ -3141,7 +3197,8 @@ namespace { } void DownloadLargeBlob(BuildStorage& Storage, - const std::filesystem::path& Path, + const std::filesystem::path& TempFolderPath, + const std::filesystem::path& CacheFolderPath, const ChunkedFolderContent& RemoteContent, const ChunkedContentLookup& RemoteLookup, const Oid& BuildId, @@ -3166,7 +3223,7 @@ namespace { std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); std::error_code Ec; - Workload->TempFile.CreateTemporary(Path / ZenTempChunkFolderName, Ec); + Workload->TempFile.CreateTemporary(TempFolderPath, Ec); if (Ec) { throw std::runtime_error( @@ -3176,7 +3233,7 @@ namespace { BuildId, ChunkHash, PreferredMultipartChunkSize, - [&Path, + [&CacheFolderPath, &RemoteContent, &RemoteLookup, &Work, @@ -3203,7 +3260,7 @@ namespace { Work.ScheduleWork( WritePool, - [&Path, + [&CacheFolderPath, &RemoteContent, &RemoteLookup, ChunkHash, @@ -3234,8 +3291,9 @@ namespace { uint32_t ChunkIndex = RemoteLookup.ChunkHashToChunkIndex.at(ChunkHash); - SharedBuffer Chunk = - Decompress(CompressedPart, ChunkHash, RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); + SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)), + ChunkHash, + RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); // ZEN_ASSERT_SLOW(ChunkHash == // IoHash::HashBuffer(Chunk.AsIoBuffer())); @@ -3244,7 +3302,7 @@ namespace { { WriteFileCache OpenFileCache; - WriteChunkToDisk(Path, + WriteChunkToDisk(CacheFolderPath, RemoteContent, ChunkTargetPtrs, CompositeBuffer(Chunk), @@ -3290,6 +3348,7 @@ namespace { bool WipeTargetFolder, FolderContent& OutLocalFolderState) { + ZEN_UNUSED(WipeTargetFolder); std::atomic<uint64_t> DownloadedBlocks = 0; std::atomic<uint64_t> BlockBytes = 0; std::atomic<uint64_t> DownloadedChunks = 0; @@ -3307,16 +3366,17 @@ namespace { ZEN_CONSOLE("Indexed local and remote content in {}", NiceTimeSpanMs(IndexTimer.GetElapsedTimeMs())); - const std::filesystem::path CacheFolderPath = Path / ZenTempReuseFolderName; + const std::filesystem::path CacheFolderPath = Path / ZenTempCacheFolderName; - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> LocalRawHashToPathIndex; + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToLocalPathIndex; - if (!WipeTargetFolder) { Stopwatch CacheTimer; for (uint32_t LocalPathIndex = 0; LocalPathIndex < LocalContent.Paths.size(); LocalPathIndex++) { + ZEN_ASSERT_SLOW(std::filesystem::exists(Path / LocalContent.Paths[LocalPathIndex])); + if (LocalContent.RawSizes[LocalPathIndex] > 0) { const uint32_t SequenceRawHashIndex = @@ -3325,99 +3385,33 @@ namespace { if (ChunkCount > 0) { const IoHash LocalRawHash = LocalContent.RawHashes[LocalPathIndex]; - if (!LocalRawHashToPathIndex.contains(LocalRawHash)) + if (!RawHashToLocalPathIndex.contains(LocalRawHash)) { - LocalRawHashToPathIndex.insert_or_assign(LocalRawHash, LocalPathIndex); + RawHashToLocalPathIndex.insert_or_assign(LocalRawHash, LocalPathIndex); } } } } - - { - std::vector<bool> IncludeLocalFiles(LocalContent.Paths.size(), false); - - for (const IoHash& ChunkHash : RemoteContent.ChunkedContent.ChunkHashes) - { - if (auto It = LocalLookup.ChunkHashToChunkIndex.find(ChunkHash); It != LocalLookup.ChunkHashToChunkIndex.end()) - { - const uint32_t LocalChunkIndex = It->second; - std::span<const ChunkedContentLookup::ChunkLocation> LocalChunkTargetRange = - GetChunkLocations(LocalLookup, LocalChunkIndex); - if (!LocalChunkTargetRange.empty()) - { - std::uint32_t LocalPathIndex = LocalChunkTargetRange[0].PathIndex; - IncludeLocalFiles[LocalPathIndex] = true; - } - } - } - for (const IoHash& RawHash : RemoteContent.RawHashes) - { - if (auto It = LocalRawHashToPathIndex.find(RawHash); It != LocalRawHashToPathIndex.end()) - { - uint32_t LocalPathIndex = It->second; - IncludeLocalFiles[LocalPathIndex] = true; - } - } - - for (uint32_t LocalPathIndex = 0; LocalPathIndex < LocalContent.Paths.size(); LocalPathIndex++) - { - if (!IncludeLocalFiles[LocalPathIndex]) - { - LocalRawHashToPathIndex.erase(LocalContent.RawHashes[LocalPathIndex]); - } - } - } - - uint64_t CachedBytes = 0; - CreateDirectories(CacheFolderPath); - for (auto& CachedLocalFile : LocalRawHashToPathIndex) - { - const IoHash& LocalRawHash = CachedLocalFile.first; - const uint32_t LocalPathIndex = CachedLocalFile.second; - const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); - const std::filesystem::path CacheFilePath = (CacheFolderPath / LocalRawHash.ToHexString()).make_preferred(); - - SetFileReadOnly(LocalFilePath, false); - - std::filesystem::rename(LocalFilePath, CacheFilePath); - CachedBytes += std::filesystem::file_size(CacheFilePath); - } - - ZEN_CONSOLE("Cached {} ({}) local files in {}", - LocalRawHashToPathIndex.size(), - NiceBytes(CachedBytes), - NiceTimeSpanMs(CacheTimer.GetElapsedTimeMs())); - } - - if (AbortFlag) - { - return; } - - CleanDirectory(Path, DefaultExcludeFolders); - Stopwatch CacheMappingTimer; std::atomic<uint64_t> BytesWritten = 0; uint64_t CacheMappedBytesForReuse = 0; - std::vector<bool> RemotePathIndexWantsCopyFromCacheFlags(RemoteContent.Paths.size(), false); - std::vector<std::atomic<bool>> RemoteChunkIndexWantsCopyFromCacheFlags(RemoteContent.ChunkedContent.ChunkHashes.size()); + std::vector<bool> RemotePathIndexWantsCopyFromCacheFlags(RemoteContent.Paths.size(), false); + std::vector<bool> RemoteChunkIndexWantsCopyFromCacheFlags(RemoteContent.ChunkedContent.ChunkHashes.size()); // Guard if he same chunks is in multiple blocks (can happen due to block reuse, cache reuse blocks writes directly) std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(RemoteContent.ChunkedContent.ChunkHashes.size()); struct CacheCopyData { - std::filesystem::path OriginalSourceFileName; - IoHash LocalFileRawHash; - uint64_t LocalFileRawSize = 0; - std::vector<uint32_t> RemotePathIndexes; - std::vector<const ChunkedContentLookup::ChunkLocation*> ChunkSourcePtrs; + uint32_t LocalPathIndex; + std::vector<const ChunkedContentLookup::ChunkLocation*> TargetChunkLocationPtrs; struct ChunkTarget { - uint32_t ChunkSourceCount; + uint32_t TargetChunkLocationCount; uint64_t ChunkRawSize; - uint64_t LocalFileOffset; + uint64_t CacheFileOffset; }; std::vector<ChunkTarget> ChunkTargets; }; @@ -3426,42 +3420,24 @@ namespace { std::vector<CacheCopyData> CacheCopyDatas; uint32_t ChunkCountToWrite = 0; - // Pick up all whole files to copy and/or move + // Pick up all whole files we can use from current local state for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++) { const IoHash& RemoteRawHash = RemoteContent.RawHashes[RemotePathIndex]; - if (auto It = LocalRawHashToPathIndex.find(RemoteRawHash); It != LocalRawHashToPathIndex.end()) + if (auto It = RawHashToLocalPathIndex.find(RemoteRawHash); It != RawHashToLocalPathIndex.end()) { - if (auto CopySourceIt = RawHashToCacheCopyDataIndex.find(RemoteRawHash); CopySourceIt != RawHashToCacheCopyDataIndex.end()) - { - CacheCopyData& Data = CacheCopyDatas[CopySourceIt->second]; - Data.RemotePathIndexes.push_back(RemotePathIndex); - } - else - { - const uint32_t LocalPathIndex = It->second; - ZEN_ASSERT(LocalContent.RawSizes[LocalPathIndex] == RemoteContent.RawSizes[RemotePathIndex]); - ZEN_ASSERT(LocalContent.RawHashes[LocalPathIndex] == RemoteContent.RawHashes[RemotePathIndex]); - RawHashToCacheCopyDataIndex.insert_or_assign(RemoteRawHash, CacheCopyDatas.size()); - CacheCopyDatas.push_back(CacheCopyData{.OriginalSourceFileName = LocalContent.Paths[LocalPathIndex], - .LocalFileRawHash = RemoteRawHash, - .LocalFileRawSize = LocalContent.RawSizes[LocalPathIndex], - .RemotePathIndexes = {RemotePathIndex}}); - CacheMappedBytesForReuse += RemoteContent.RawSizes[RemotePathIndex]; - ChunkCountToWrite++; - } RemotePathIndexWantsCopyFromCacheFlags[RemotePathIndex] = true; + CacheMappedBytesForReuse += RemoteContent.RawSizes[RemotePathIndex]; } } - // Pick up all chunks in cached files and make sure we block moving of cache files if we need part of them - for (auto& CachedLocalFile : LocalRawHashToPathIndex) + // Pick up all chunks in current local state + for (auto& CachedLocalFile : RawHashToLocalPathIndex) { const IoHash& LocalFileRawHash = CachedLocalFile.first; const uint32_t LocalPathIndex = CachedLocalFile.second; const uint32_t LocalSequenceRawHashIndex = LocalLookup.RawHashToSequenceRawHashIndex.at(LocalFileRawHash); - const uint32_t LocalOrderOffset = - LocalLookup.SequenceRawHashIndexChunkOrderOffset[LocalSequenceRawHashIndex]; // CachedLocalFile.second.ChunkOrderOffset; + const uint32_t LocalOrderOffset = LocalLookup.SequenceRawHashIndexChunkOrderOffset[LocalSequenceRawHashIndex]; { uint64_t SourceOffset = 0; @@ -3482,30 +3458,30 @@ namespace { if (!ChunkTargetPtrs.empty()) { - CacheCopyData::ChunkTarget Target = {.ChunkSourceCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()), - .ChunkRawSize = LocalChunkRawSize, - .LocalFileOffset = SourceOffset}; + CacheCopyData::ChunkTarget Target = { + .TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()), + .ChunkRawSize = LocalChunkRawSize, + .CacheFileOffset = SourceOffset}; if (auto CopySourceIt = RawHashToCacheCopyDataIndex.find(LocalFileRawHash); CopySourceIt != RawHashToCacheCopyDataIndex.end()) { CacheCopyData& Data = CacheCopyDatas[CopySourceIt->second]; - Data.ChunkSourcePtrs.insert(Data.ChunkSourcePtrs.end(), ChunkTargetPtrs.begin(), ChunkTargetPtrs.end()); + Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(), + ChunkTargetPtrs.begin(), + ChunkTargetPtrs.end()); Data.ChunkTargets.push_back(Target); } else { RawHashToCacheCopyDataIndex.insert_or_assign(LocalFileRawHash, CacheCopyDatas.size()); CacheCopyDatas.push_back( - CacheCopyData{.OriginalSourceFileName = LocalContent.Paths[LocalPathIndex], - .LocalFileRawHash = LocalFileRawHash, - .LocalFileRawSize = LocalContent.RawSizes[LocalPathIndex], - .RemotePathIndexes = {}, - .ChunkSourcePtrs = ChunkTargetPtrs, - .ChunkTargets = std::vector<CacheCopyData::ChunkTarget>{Target}}); + CacheCopyData{.LocalPathIndex = LocalPathIndex, + .TargetChunkLocationPtrs = ChunkTargetPtrs, + .ChunkTargets = std::vector<CacheCopyData::ChunkTarget>{Target}}); } CacheMappedBytesForReuse += LocalChunkRawSize; + RemoteChunkIndexWantsCopyFromCacheFlags[RemoteChunkIndex] = true; } - RemoteChunkIndexWantsCopyFromCacheFlags[RemoteChunkIndex] = true; } } SourceOffset += LocalChunkRawSize; @@ -3535,531 +3511,554 @@ namespace { ZEN_CONSOLE("Mapped {} cached data for reuse in {}", NiceBytes(CacheMappedBytesForReuse), NiceTimeSpanMs(CacheMappingTimer.GetElapsedTimeMs())); + { + WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // + WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // - auto CopyChunksFromCacheFile = [](const std::filesystem::path& Path, - BufferedOpenFile& SourceFile, - WriteFileCache& OpenFileCache, - const ChunkedFolderContent& RemoteContent, - const uint64_t LocalFileSourceOffset, - const uint64_t LocalChunkRawSize, - std::span<const ChunkedContentLookup::ChunkLocation* const> ChunkTargetPtrs, - uint64_t& OutBytesWritten) { - CompositeBuffer Chunk = SourceFile.GetRange(LocalFileSourceOffset, LocalChunkRawSize); - uint64_t TotalBytesWritten = 0; - - WriteChunkToDisk(Path, RemoteContent, ChunkTargetPtrs, Chunk, OpenFileCache, TotalBytesWritten); - OutBytesWritten += TotalBytesWritten; - }; - - auto CloneFullFileFromCache = [](const std::filesystem::path& Path, - const std::filesystem::path& CacheFolderPath, - const ChunkedFolderContent& RemoteContent, - const IoHash& FileRawHash, - const uint64_t FileRawSize, - std::span<const uint32_t> FullCloneRemotePathIndexes, - bool CanMove, - uint64_t& OutBytesWritten) { - const std::filesystem::path CacheFilePath = (CacheFolderPath / FileRawHash.ToHexString()).make_preferred(); + ProgressBar WriteProgressBar(UsePlainProgress); + ParallellWork Work(AbortFlag); - size_t CopyCount = FullCloneRemotePathIndexes.size(); - if (CanMove) - { - // If every reference to this chunk has are full files we can move the cache file to the last target - CopyCount--; - } + std::atomic<uint64_t> BytesDownloaded = 0; - for (uint32_t RemotePathIndex : FullCloneRemotePathIndexes) + for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++) { - const std::filesystem::path TargetPath = (Path / RemoteContent.Paths[RemotePathIndex]).make_preferred(); - CreateDirectories(TargetPath.parent_path()); - - if (CopyCount == 0) - { - std::filesystem::rename(CacheFilePath, TargetPath); - } - else + if (AbortFlag) { - CopyFile(CacheFilePath, TargetPath, {.EnableClone = false}); - ZEN_ASSERT(CopyCount > 0); - CopyCount--; + break; } - OutBytesWritten += FileRawSize; - } - }; - - WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // - WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // - - ProgressBar WriteProgressBar(UsePlainProgress); - ParallellWork Work(AbortFlag); - - std::atomic<uint64_t> BytesDownloaded = 0; - - for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++) - { - if (AbortFlag) - { - break; - } - - Work.ScheduleWork( - WritePool, // GetSyncWorkerPool(),// - [&, CopyDataIndex](std::atomic<bool>&) { - if (!AbortFlag) - { - const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex]; - const std::filesystem::path CacheFilePath = - (CacheFolderPath / CopyData.LocalFileRawHash.ToHexString()).make_preferred(); - if (!CopyData.ChunkSourcePtrs.empty()) + Work.ScheduleWork( + WritePool, // GetSyncWorkerPool(),// + [&, CopyDataIndex](std::atomic<bool>&) { + if (!AbortFlag) { - uint64_t CacheLocalFileBytesRead = 0; - - size_t TargetStart = 0; - const std::span<const ChunkedContentLookup::ChunkLocation* const> AllTargets(CopyData.ChunkSourcePtrs); - - struct WriteOp + const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex]; + const std::filesystem::path LocalFilePath = + (Path / LocalContent.Paths[CopyData.LocalPathIndex]).make_preferred(); + if (!CopyData.TargetChunkLocationPtrs.empty()) { - const ChunkedContentLookup::ChunkLocation* Target; - uint64_t LocalFileOffset; - uint64_t ChunkSize; - }; + uint64_t CacheLocalFileBytesRead = 0; - std::vector<WriteOp> WriteOps; - WriteOps.reserve(CopyData.ChunkSourcePtrs.size()); + size_t TargetStart = 0; + const std::span<const ChunkedContentLookup::ChunkLocation* const> AllTargets( + CopyData.TargetChunkLocationPtrs); - for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets) - { - std::span<const ChunkedContentLookup::ChunkLocation* const> TargetRange = - AllTargets.subspan(TargetStart, ChunkTarget.ChunkSourceCount); - for (const ChunkedContentLookup::ChunkLocation* Target : TargetRange) + struct WriteOp { - WriteOps.push_back(WriteOp{.Target = Target, - .LocalFileOffset = ChunkTarget.LocalFileOffset, - .ChunkSize = ChunkTarget.ChunkRawSize}); - } - TargetStart += ChunkTarget.ChunkSourceCount; - } + const ChunkedContentLookup::ChunkLocation* Target; + uint64_t CacheFileOffset; + uint64_t ChunkSize; + }; - std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) { - if (Lhs.Target->PathIndex < Rhs.Target->PathIndex) + std::vector<WriteOp> WriteOps; + WriteOps.reserve(AllTargets.size()); + + for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets) { - return true; + std::span<const ChunkedContentLookup::ChunkLocation* const> TargetRange = + AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount); + for (const ChunkedContentLookup::ChunkLocation* Target : TargetRange) + { + WriteOps.push_back(WriteOp{.Target = Target, + .CacheFileOffset = ChunkTarget.CacheFileOffset, + .ChunkSize = ChunkTarget.ChunkRawSize}); + } + TargetStart += ChunkTarget.TargetChunkLocationCount; } - else if (Lhs.Target->PathIndex > Rhs.Target->PathIndex) - { + + std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) { + if (Lhs.Target->PathIndex < Rhs.Target->PathIndex) + { + return true; + } + else if (Lhs.Target->PathIndex > Rhs.Target->PathIndex) + { + return false; + } + if (Lhs.Target->Offset < Rhs.Target->Offset) + { + return true; + } return false; + }); + + { + BufferedOpenFile SourceFile(LocalFilePath); + WriteFileCache OpenFileCache; + for (const WriteOp& Op : WriteOps) + { + if (AbortFlag) + { + break; + } + const uint32_t RemotePathIndex = Op.Target->PathIndex; + const uint64_t ChunkSize = Op.ChunkSize; + CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ChunkSize); + + ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]); + + OpenFileCache.WriteToFile<CompositeBuffer>( + RemotePathIndex, + [&CacheFolderPath, &RemoteContent](uint32_t TargetIndex) { + return (CacheFolderPath / RemoteContent.RawHashes[TargetIndex].ToHexString()) + .make_preferred(); + }, + ChunkSource, + Op.Target->Offset, + RemoteContent.RawSizes[RemotePathIndex]); + BytesWritten += ChunkSize; + WriteToDiskBytes += ChunkSize; + CacheLocalFileBytesRead += ChunkSize; // TODO: This should be the sum of unique chunk sizes? + } } - if (Lhs.Target->Offset < Rhs.Target->Offset) + if (!AbortFlag) { - return true; + ChunkCountWritten += gsl::narrow<uint32_t>(CopyData.ChunkTargets.size()); + ZEN_DEBUG("Copied {} from {}", + NiceBytes(CacheLocalFileBytesRead), + LocalContent.Paths[CopyData.LocalPathIndex]); } - return false; - }); + } + } + }, + Work.DefaultErrorFunction()); + } - BufferedOpenFile SourceFile(CacheFilePath); - WriteFileCache OpenFileCache; - for (const WriteOp& Op : WriteOps) - { - if (AbortFlag) + for (const IoHash ChunkHash : LooseChunkHashes) + { + if (AbortFlag) + { + break; + } + + uint32_t RemoteChunkIndex = RemoteLookup.ChunkHashToChunkIndex.at(ChunkHash); + if (RemoteChunkIndexWantsCopyFromCacheFlags[RemoteChunkIndex]) + { + ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash); + continue; + } + bool NeedsCopy = true; + if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false)) + { + std::vector<const ChunkedContentLookup::ChunkLocation*> ChunkTargetPtrs = + GetRemainingChunkTargets(RemotePathIndexWantsCopyFromCacheFlags, RemoteLookup, RemoteChunkIndex); + + if (ChunkTargetPtrs.empty()) + { + ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash); + } + else + { + Work.ScheduleWork( + NetworkPool, + [&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) { + if (!AbortFlag) { - break; - } - const uint32_t RemotePathIndex = Op.Target->PathIndex; - const uint64_t ChunkSize = Op.ChunkSize; - CompositeBuffer ChunkSource = SourceFile.GetRange(Op.LocalFileOffset, ChunkSize); + if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) + { + DownloadLargeBlob(Storage, + Path / ZenTempChunkFolderName, + CacheFolderPath, + RemoteContent, + RemoteLookup, + BuildId, + ChunkHash, + PreferredMultipartChunkSize, + ChunkTargetPtrs, + Work, + WritePool, + NetworkPool, + BytesWritten, + WriteToDiskBytes, + BytesDownloaded, + LooseChunksBytes, + DownloadedChunks, + ChunkCountWritten, + MultipartAttachmentCount); + } + else + { + IoBuffer CompressedPart = Storage.GetBuildBlob(BuildId, ChunkHash); + if (!CompressedPart) + { + throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); + } + BytesDownloaded += CompressedPart.GetSize(); + LooseChunksBytes += CompressedPart.GetSize(); + CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(CompressedPart)), + Path / ZenTempChunkFolderName, + ChunkHash); + DownloadedChunks++; - ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]); + if (!AbortFlag) + { + Work.ScheduleWork( + WritePool, + [&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs, CompressedPart = std::move(Payload)]( + std::atomic<bool>&) { + if (!AbortFlag) + { + uint64_t TotalBytesWritten = 0; + SharedBuffer Chunk = + Decompress(CompressedPart, + ChunkHash, + RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex]); - OpenFileCache.WriteToFile<CompositeBuffer>( - RemotePathIndex, - [&Path, &RemoteContent](uint32_t TargetIndex) { - return (Path / RemoteContent.Paths[TargetIndex]).make_preferred(); - }, - ChunkSource, - Op.Target->Offset, - RemoteContent.RawSizes[RemotePathIndex]); - BytesWritten += ChunkSize; - WriteToDiskBytes += ChunkSize; - CacheLocalFileBytesRead += ChunkSize; // TODO: This should be the sum of unique chunk sizes? - } - if (!AbortFlag) - { - ChunkCountWritten += gsl::narrow<uint32_t>(CopyData.ChunkTargets.size()); - ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), CopyData.OriginalSourceFileName); - } - } + { + WriteFileCache OpenFileCache; + WriteChunkToDisk(CacheFolderPath, + RemoteContent, + ChunkTargetPtrs, + CompositeBuffer(Chunk), + OpenFileCache, + TotalBytesWritten); + } + ChunkCountWritten++; + BytesWritten += TotalBytesWritten; + WriteToDiskBytes += TotalBytesWritten; + } + }, + Work.DefaultErrorFunction()); + } + } + } + }, + Work.DefaultErrorFunction()); + } + } + } - if (CopyData.RemotePathIndexes.empty()) - { - std::filesystem::remove(CacheFilePath); - } - else if (!AbortFlag) + size_t BlockCount = BlockDescriptions.size(); + std::atomic<size_t> BlocksComplete = 0; + + auto IsBlockNeeded = [&RemoteContent, &RemoteLookup, &RemoteChunkIndexNeedsCopyFromSourceFlags]( + const ChunkBlockDescription& BlockDescription) -> bool { + for (const IoHash& ChunkHash : BlockDescription.ChunkRawHashes) + { + if (auto It = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != RemoteLookup.ChunkHashToChunkIndex.end()) + { + const uint32_t RemoteChunkIndex = It->second; + if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex]) { - uint64_t LocalBytesWritten = 0; - CloneFullFileFromCache(Path, - CacheFolderPath, - RemoteContent, - CopyData.LocalFileRawHash, - CopyData.LocalFileRawSize, - CopyData.RemotePathIndexes, - true, - LocalBytesWritten); - // CacheLocalFileBytesRead += CopyData.LocalFileRawSize; - BytesWritten += LocalBytesWritten; - WriteToDiskBytes += LocalBytesWritten; - ChunkCountWritten++; - - ZEN_DEBUG("Used full cached file {} ({}) for {} ({}) targets", - CopyData.OriginalSourceFileName, - NiceBytes(CopyData.LocalFileRawSize), - CopyData.RemotePathIndexes.size(), - NiceBytes(LocalBytesWritten)); + return true; } } - }, - Work.DefaultErrorFunction()); - } - - for (const IoHash ChunkHash : LooseChunkHashes) - { - if (AbortFlag) - { - break; - } + } + return false; + }; - uint32_t RemoteChunkIndex = RemoteLookup.ChunkHashToChunkIndex.at(ChunkHash); - if (RemoteChunkIndexWantsCopyFromCacheFlags[RemoteChunkIndex]) + size_t BlocksNeededCount = 0; + for (size_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++) { - ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash); - continue; - } - bool NeedsCopy = true; - if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false)) - { - std::vector<const ChunkedContentLookup::ChunkLocation*> ChunkTargetPtrs = - GetRemainingChunkTargets(RemotePathIndexWantsCopyFromCacheFlags, RemoteLookup, RemoteChunkIndex); - - if (ChunkTargetPtrs.empty()) + if (Work.IsAborted()) { - ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash); + break; } - else + if (IsBlockNeeded(BlockDescriptions[BlockIndex])) { + BlocksNeededCount++; Work.ScheduleWork( NetworkPool, - [&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) { + [&, BlockIndex](std::atomic<bool>&) { if (!AbortFlag) { - if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) + IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescriptions[BlockIndex].BlockHash); + if (!BlockBuffer) { - DownloadLargeBlob(Storage, - Path, - RemoteContent, - RemoteLookup, - BuildId, - ChunkHash, - PreferredMultipartChunkSize, - ChunkTargetPtrs, - Work, - WritePool, - NetworkPool, - BytesWritten, - WriteToDiskBytes, - BytesDownloaded, - LooseChunksBytes, - DownloadedChunks, - ChunkCountWritten, - MultipartAttachmentCount); + throw std::runtime_error(fmt::format("Block {} is missing", BlockDescriptions[BlockIndex].BlockHash)); } - else + BytesDownloaded += BlockBuffer.GetSize(); + BlockBytes += BlockBuffer.GetSize(); + DownloadedBlocks++; + CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(BlockBuffer)), + Path / ZenTempBlockFolderName, + BlockDescriptions[BlockIndex].BlockHash); + + if (!AbortFlag) { - IoBuffer CompressedPart = Storage.GetBuildBlob(BuildId, ChunkHash); - if (!CompressedPart) - { - throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); - } - BytesDownloaded += CompressedPart.GetSize(); - LooseChunksBytes += CompressedPart.GetSize(); - DownloadedChunks++; + Work.ScheduleWork( + WritePool, + [&, BlockIndex, BlockBuffer = std::move(Payload)](std::atomic<bool>&) { + if (!AbortFlag) + { + IoHash BlockRawHash; + uint64_t BlockRawSize; + CompressedBuffer CompressedBlockBuffer = + CompressedBuffer::FromCompressed(std::move(BlockBuffer), BlockRawHash, BlockRawSize); + if (!CompressedBlockBuffer) + { + throw std::runtime_error(fmt::format("Block {} is not a compressed buffer", + BlockDescriptions[BlockIndex].BlockHash)); + } - if (!AbortFlag) - { - Work.ScheduleWork( - WritePool, - [&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs, CompressedPart = std::move(CompressedPart)]( - std::atomic<bool>&) { - if (!AbortFlag) + if (BlockRawHash != BlockDescriptions[BlockIndex].BlockHash) + { + throw std::runtime_error(fmt::format("Block {} header has a mismatching raw hash {}", + BlockDescriptions[BlockIndex].BlockHash, + BlockRawHash)); + } + + CompositeBuffer DecompressedBlockBuffer = CompressedBlockBuffer.DecompressToComposite(); + if (!DecompressedBlockBuffer) { - uint64_t TotalBytesWritten = 0; - SharedBuffer Chunk = - Decompress(CompressedPart, - ChunkHash, - RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex]); - WriteFileCache OpenFileCache; - - WriteChunkToDisk(Path, + throw std::runtime_error(fmt::format("Block {} failed to decompress", + BlockDescriptions[BlockIndex].BlockHash)); + } + + ZEN_ASSERT_SLOW(BlockDescriptions[BlockIndex].BlockHash == + IoHash::HashBuffer(DecompressedBlockBuffer)); + + uint64_t BytesWrittenToDisk = 0; + uint32_t ChunksReadFromBlock = 0; + if (WriteBlockToDisk(CacheFolderPath, RemoteContent, - ChunkTargetPtrs, - CompositeBuffer(Chunk), - OpenFileCache, - TotalBytesWritten); - ChunkCountWritten++; - BytesWritten += TotalBytesWritten; - WriteToDiskBytes += TotalBytesWritten; + RemotePathIndexWantsCopyFromCacheFlags, + DecompressedBlockBuffer, + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags.data(), + ChunksReadFromBlock, + BytesWrittenToDisk)) + { + BytesWritten += BytesWrittenToDisk; + WriteToDiskBytes += BytesWrittenToDisk; + ChunkCountWritten += ChunksReadFromBlock; } - }, - Work.DefaultErrorFunction()); - } + else + { + throw std::runtime_error( + fmt::format("Block {} is malformed", BlockDescriptions[BlockIndex].BlockHash)); + } + BlocksComplete++; + } + }, + [&, BlockIndex](const std::exception& Ex, std::atomic<bool>&) { + ZEN_ERROR("Failed writing block {}. Reason: {}", + BlockDescriptions[BlockIndex].BlockHash, + Ex.what()); + AbortFlag = true; + }); } } }, Work.DefaultErrorFunction()); } + else + { + ZEN_DEBUG("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash); + } } + + Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted, PendingWork); + ZEN_ASSERT(ChunkCountToWrite >= ChunkCountWritten.load()); + WriteProgressBar.UpdateState( + {.Task = "Writing chunks ", + .Details = fmt::format("Written {} chunks out of {}. {} ouf of {} blocks complete. Downloaded: {}. Written: {}", + ChunkCountWritten.load(), + ChunkCountToWrite, + BlocksComplete.load(), + BlocksNeededCount, + NiceBytes(BytesDownloaded.load()), + NiceBytes(BytesWritten.load())), + .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite), + .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - ChunkCountWritten.load())}, + false); + }); + + if (AbortFlag) + { + return; + } + + WriteProgressBar.Finish(); } - size_t BlockCount = BlockDescriptions.size(); - std::atomic<size_t> BlocksComplete = 0; + std::vector<std::pair<IoHash, uint32_t>> Targets; + Targets.reserve(RemoteContent.Paths.size()); + for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++) + { + Targets.push_back(std::make_pair(RemoteContent.RawHashes[RemotePathIndex], RemotePathIndex)); + } + std::sort(Targets.begin(), Targets.end(), [](const std::pair<IoHash, uint32_t>& Lhs, const std::pair<IoHash, uint32_t>& Rhs) { + return Lhs.first < Rhs.first; + }); + + // Move all files we will reuse to cache folder + for (auto It : RawHashToLocalPathIndex) + { + const IoHash& RawHash = It.first; + if (RemoteLookup.RawHashToSequenceRawHashIndex.contains(RawHash)) + { + const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[It.second]).make_preferred(); + const std::filesystem::path CacheFilePath = (CacheFolderPath / RawHash.ToHexString()).make_preferred(); + ZEN_ASSERT_SLOW(std::filesystem::exists(LocalFilePath)); + SetFileReadOnly(LocalFilePath, false); + std::filesystem::rename(LocalFilePath, CacheFilePath); + } + } - auto IsBlockNeeded = [&RemoteContent, &RemoteLookup, &RemoteChunkIndexNeedsCopyFromSourceFlags]( - const ChunkBlockDescription& BlockDescription) -> bool { - for (const IoHash& ChunkHash : BlockDescription.ChunkRawHashes) + if (WipeTargetFolder) + { + // Clean target folder + ZEN_CONSOLE("Wiping {}", Path); + CleanDirectory(Path, DefaultExcludeFolders); + } + else + { + // Remove unused tracked files + tsl::robin_map<std::string, uint32_t> RemotePathToRemoteIndex; + RemotePathToRemoteIndex.reserve(RemoteContent.Paths.size()); + for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++) + { + RemotePathToRemoteIndex.insert({RemoteContent.Paths[RemotePathIndex].generic_string(), RemotePathIndex}); + } + std::vector<std::filesystem::path> LocalFilesToRemove; + for (uint32_t LocalPathIndex = 0; LocalPathIndex < LocalContent.Paths.size(); LocalPathIndex++) { - if (auto It = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != RemoteLookup.ChunkHashToChunkIndex.end()) + if (!RemotePathToRemoteIndex.contains(LocalContent.Paths[LocalPathIndex].generic_string())) { - const uint32_t RemoteChunkIndex = It->second; - if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex]) + const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); + if (std::filesystem::exists(LocalFilePath)) { - return true; + LocalFilesToRemove.emplace_back(std::move(LocalFilePath)); } } } - return false; - }; - - for (size_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++) - { - if (Work.IsAborted()) + if (!LocalFilesToRemove.empty()) { - break; + ZEN_CONSOLE("Cleaning {} removed files from {}", LocalFilesToRemove.size(), Path); + for (const std::filesystem::path& LocalFilePath : LocalFilesToRemove) + { + SetFileReadOnly(LocalFilePath, false); + std::filesystem::remove(LocalFilePath); + } } - Work.ScheduleWork( - WritePool, - [&, BlockIndex](std::atomic<bool>&) { - if (!AbortFlag) - { - if (IsBlockNeeded(BlockDescriptions[BlockIndex])) - { - Work.ScheduleWork( - NetworkPool, - [&, BlockIndex](std::atomic<bool>&) { - if (!AbortFlag) - { - IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescriptions[BlockIndex].BlockHash); - if (!BlockBuffer) - { - throw std::runtime_error( - fmt::format("Block {} is missing", BlockDescriptions[BlockIndex].BlockHash)); - } - BytesDownloaded += BlockBuffer.GetSize(); - BlockBytes += BlockBuffer.GetSize(); - DownloadedBlocks++; + } - if (!AbortFlag) - { - Work.ScheduleWork( - WritePool, - [&, BlockIndex, BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) { - if (!AbortFlag) - { - IoHash BlockRawHash; - uint64_t BlockRawSize; - CompressedBuffer CompressedBlockBuffer = - CompressedBuffer::FromCompressed(SharedBuffer(std::move(BlockBuffer)), - BlockRawHash, - BlockRawSize); - if (!CompressedBlockBuffer) - { - throw std::runtime_error(fmt::format("Block {} is not a compressed buffer", - BlockDescriptions[BlockIndex].BlockHash)); - } + { + WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // - if (BlockRawHash != BlockDescriptions[BlockIndex].BlockHash) - { - throw std::runtime_error( - fmt::format("Block {} header has a mismatching raw hash {}", - BlockDescriptions[BlockIndex].BlockHash, - BlockRawHash)); - } + ProgressBar RebuildProgressBar(UsePlainProgress); + ParallellWork Work(AbortFlag); - CompositeBuffer DecompressedBlockBuffer = - CompressedBlockBuffer.DecompressToComposite(); - if (!DecompressedBlockBuffer) - { - throw std::runtime_error(fmt::format("Block {} failed to decompress", - BlockDescriptions[BlockIndex].BlockHash)); - } + OutLocalFolderState.Paths.resize(RemoteContent.Paths.size()); + OutLocalFolderState.RawSizes.resize(RemoteContent.Paths.size()); + OutLocalFolderState.Attributes.resize(RemoteContent.Paths.size()); + OutLocalFolderState.ModificationTicks.resize(RemoteContent.Paths.size()); - ZEN_ASSERT_SLOW(BlockDescriptions[BlockIndex].BlockHash == - IoHash::HashBuffer(DecompressedBlockBuffer)); + std::atomic<uint64_t> TargetsComplete = 0; - uint64_t BytesWrittenToDisk = 0; - uint32_t ChunksReadFromBlock = 0; - if (WriteBlockToDisk(Path, - RemoteContent, - RemotePathIndexWantsCopyFromCacheFlags, - DecompressedBlockBuffer, - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags.data(), - ChunksReadFromBlock, - BytesWrittenToDisk)) - { - BytesWritten += BytesWrittenToDisk; - WriteToDiskBytes += BytesWrittenToDisk; - ChunkCountWritten += ChunksReadFromBlock; - } - else - { - throw std::runtime_error(fmt::format("Block {} is malformed", - BlockDescriptions[BlockIndex].BlockHash)); - } - BlocksComplete++; - } - }, - [&, BlockIndex](const std::exception& Ex, std::atomic<bool>&) { - ZEN_ERROR("Failed writing block {}. Reason: {}", - BlockDescriptions[BlockIndex].BlockHash, - Ex.what()); - AbortFlag = true; - }); - } - } - }, - Work.DefaultErrorFunction()); - } - else - { - ZEN_DEBUG("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash); - BlocksComplete++; - } - } - }, - Work.DefaultErrorFunction()); - } - for (uint32_t PathIndex = 0; PathIndex < RemoteContent.Paths.size(); PathIndex++) - { - if (Work.IsAborted()) - { - break; - } - if (RemoteContent.RawSizes[PathIndex] == 0) + size_t TargetOffset = 0; + while (TargetOffset < Targets.size()) { + if (AbortFlag) + { + break; + } + + size_t TargetCount = 1; + const IoHash& RawHash = Targets[TargetOffset].first; + while (Targets[TargetOffset + TargetCount].first == RawHash) + { + TargetCount++; + } + Work.ScheduleWork( - WritePool, - [&, PathIndex](std::atomic<bool>&) { + WritePool, // GetSyncWorkerPool(),// + [&, BaseTargetOffset = TargetOffset, TargetCount](std::atomic<bool>&) { if (!AbortFlag) { - const std::filesystem::path TargetPath = (Path / RemoteContent.Paths[PathIndex]).make_preferred(); - CreateDirectories(TargetPath.parent_path()); - BasicFile OutputFile; - OutputFile.Open(TargetPath, BasicFile::Mode::kTruncate); + size_t TargetOffset = BaseTargetOffset; + const IoHash& RawHash = Targets[TargetOffset].first; + const uint32_t FirstTargetPathIndex = Targets[TargetOffset].second; + const std::filesystem::path& FirstTargetPath = RemoteContent.Paths[FirstTargetPathIndex]; + OutLocalFolderState.Paths[FirstTargetPathIndex] = FirstTargetPath; + OutLocalFolderState.RawSizes[FirstTargetPathIndex] = RemoteContent.RawSizes[FirstTargetPathIndex]; + const std::filesystem::path FirstTargetFilePath = (Path / FirstTargetPath).make_preferred(); + if (RawHash == IoHash::Zero) + { + if (std::filesystem::exists(FirstTargetFilePath)) + { + SetFileReadOnly(FirstTargetFilePath, false); + } + CreateDirectories(FirstTargetFilePath.parent_path()); + { + BasicFile OutputFile; + OutputFile.Open(FirstTargetFilePath, BasicFile::Mode::kTruncate); + } + } + else + { + const std::filesystem::path CacheFilePath = (CacheFolderPath / RawHash.ToHexString()).make_preferred(); + ZEN_ASSERT_SLOW(std::filesystem::exists(CacheFilePath)); + CreateDirectories(FirstTargetFilePath.parent_path()); + if (std::filesystem::exists(FirstTargetFilePath)) + { + SetFileReadOnly(FirstTargetFilePath, false); + } + std::filesystem::rename(CacheFilePath, FirstTargetFilePath); + } + + OutLocalFolderState.Attributes[FirstTargetPathIndex] = + RemoteContent.Attributes.empty() ? GetNativeFileAttributes(FirstTargetFilePath) + : SetNativeFileAttributes(FirstTargetFilePath, + RemoteContent.Platform, + RemoteContent.Attributes[FirstTargetPathIndex]); + OutLocalFolderState.ModificationTicks[FirstTargetPathIndex] = GetModificationTickFromPath(FirstTargetFilePath); + + TargetOffset++; + TargetsComplete++; + while (TargetOffset < (BaseTargetOffset + TargetCount)) + { + ZEN_ASSERT(Targets[TargetOffset].first == RawHash); + ZEN_ASSERT_SLOW(std::filesystem::exists(FirstTargetFilePath)); + const uint32_t ExtraTargetPathIndex = Targets[TargetOffset].second; + const std::filesystem::path& ExtraTargetPath = RemoteContent.Paths[ExtraTargetPathIndex]; + const std::filesystem::path ExtraTargetFilePath = (Path / ExtraTargetPath).make_preferred(); + OutLocalFolderState.Paths[ExtraTargetPathIndex] = ExtraTargetPath; + OutLocalFolderState.RawSizes[ExtraTargetPathIndex] = RemoteContent.RawSizes[ExtraTargetPathIndex]; + CreateDirectories(ExtraTargetFilePath.parent_path()); + if (std::filesystem::exists(ExtraTargetFilePath)) + { + SetFileReadOnly(ExtraTargetFilePath, false); + } + CopyFile(FirstTargetFilePath, ExtraTargetFilePath, {.EnableClone = false}); + + OutLocalFolderState.Attributes[ExtraTargetPathIndex] = + RemoteContent.Attributes.empty() + ? GetNativeFileAttributes(ExtraTargetFilePath) + : SetNativeFileAttributes(ExtraTargetFilePath, + RemoteContent.Platform, + RemoteContent.Attributes[ExtraTargetPathIndex]); + OutLocalFolderState.ModificationTicks[ExtraTargetPathIndex] = + GetModificationTickFromPath(ExtraTargetFilePath); + + TargetOffset++; + TargetsComplete++; + } } }, Work.DefaultErrorFunction()); - } - } - Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted, PendingWork); - ZEN_ASSERT(ChunkCountToWrite >= ChunkCountWritten.load()); - WriteProgressBar.UpdateState( - {.Task = "Writing chunks ", - .Details = fmt::format("Written {} chunks out of {}. {} ouf of {} blocks complete. Downloaded: {}. Written: {}", - ChunkCountWritten.load(), - ChunkCountToWrite, - BlocksComplete.load(), - BlockCount, - NiceBytes(BytesDownloaded.load()), - NiceBytes(BytesWritten.load())), - .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite), - .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - ChunkCountWritten.load())}, - false); - }); - - if (AbortFlag) - { - return; - } + TargetOffset += TargetCount; + } - WriteProgressBar.Finish(); + Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted, PendingWork); + RebuildProgressBar.UpdateState( + {.Task = "Rebuilding state ", + .Details = fmt::format("Written {} files out of {}", TargetsComplete.load(), Targets.size()), + .TotalCount = gsl::narrow<uint64_t>(Targets.size()), + .RemainingCount = gsl::narrow<uint64_t>(Targets.size() - TargetsComplete.load())}, + false); + }); - { - ProgressBar PremissionsProgressBar(false); - if (!RemoteContent.Attributes.empty()) + if (AbortFlag) { - auto SetNativeFileAttributes = - [](const std::filesystem::path FilePath, SourcePlatform SourcePlatform, uint32_t Attributes) -> uint32_t { -#if ZEN_PLATFORM_WINDOWS - if (SourcePlatform == SourcePlatform::Windows) - { - SetFileAttributes(FilePath, Attributes); - return Attributes; - } - else - { - uint32_t CurrentAttributes = GetFileAttributes(FilePath); - uint32_t NewAttributes = MakeFileAttributeReadOnly(CurrentAttributes, IsFileModeReadOnly(Attributes)); - if (CurrentAttributes != NewAttributes) - { - SetFileAttributes(FilePath, NewAttributes); - } - return NewAttributes; - } -#endif // ZEN_PLATFORM_WINDOWS -#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC - if (SourcePlatform != SourcePlatform::Windows) - { - SetFileMode(FilePath, Attributes); - return Attributes; - } - else - { - uint32_t CurrentMode = GetFileMode(FilePath); - uint32_t NewMode = MakeFileModeReadOnly(CurrentMode, IsFileAttributeReadOnly(Attributes)); - if (CurrentMode != NewMode) - { - SetFileMode(FilePath, NewMode); - } - return NewMode; - } -#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC - }; - - OutLocalFolderState.Paths.reserve(RemoteContent.Paths.size()); - OutLocalFolderState.RawSizes.reserve(RemoteContent.Paths.size()); - OutLocalFolderState.Attributes.reserve(RemoteContent.Paths.size()); - OutLocalFolderState.ModificationTicks.reserve(RemoteContent.Paths.size()); - for (uint32_t PathIndex = 0; PathIndex < RemoteContent.Paths.size(); PathIndex++) - { - const std::filesystem::path LocalFilePath = (Path / RemoteContent.Paths[PathIndex]); - const uint32_t CurrentPlatformAttributes = - SetNativeFileAttributes(LocalFilePath, RemoteContent.Platform, RemoteContent.Attributes[PathIndex]); - - OutLocalFolderState.Paths.push_back(RemoteContent.Paths[PathIndex]); - OutLocalFolderState.RawSizes.push_back(RemoteContent.RawSizes[PathIndex]); - OutLocalFolderState.Attributes.push_back(CurrentPlatformAttributes); - OutLocalFolderState.ModificationTicks.push_back(GetModificationTickFromPath(LocalFilePath)); - - PremissionsProgressBar.UpdateState( - {.Task = "Set permissions ", - .Details = fmt::format("Updated {} files out of {}", PathIndex, RemoteContent.Paths.size()), - .TotalCount = RemoteContent.Paths.size(), - .RemainingCount = (RemoteContent.Paths.size() - PathIndex)}, - false); - } + return; } - PremissionsProgressBar.Finish(); + + RebuildProgressBar.Finish(); } } @@ -4077,7 +4076,7 @@ namespace { CbObject BuildObject = Storage.GetBuild(BuildId); ZEN_CONSOLE("GetBuild took {}. Payload size: {}", - NiceLatencyNs(GetBuildTimer.GetElapsedTimeUs() * 1000), + NiceTimeSpanMs(GetBuildTimer.GetElapsedTimeMs()), NiceBytes(BuildObject.GetSize())); CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); @@ -4162,7 +4161,7 @@ namespace { ZEN_CONSOLE("GetBuildPart {} ('{}') took {}. Payload size: {}", BuildParts[0].first, BuildParts[0].second, - NiceLatencyNs(GetBuildPartTimer.GetElapsedTimeUs() * 1000), + NiceTimeSpanMs(GetBuildPartTimer.GetElapsedTimeMs()), NiceBytes(BuildPartManifest.GetSize())); { @@ -4202,7 +4201,7 @@ namespace { OutBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockRawHashes); ZEN_CONSOLE("GetBlockMetadata for {} took {}. Found {} blocks", BuildPartId, - NiceLatencyNs(GetBlockMetadataTimer.GetElapsedTimeUs() * 1000), + NiceTimeSpanMs(GetBlockMetadataTimer.GetElapsedTimeMs()), OutBlockDescriptions.size()); if (OutBlockDescriptions.size() != BlockRawHashes.size()) @@ -4309,7 +4308,7 @@ namespace { ZEN_CONSOLE("GetBuildPart {} ('{}') took {}. Payload size: {}", OverlayBuildPartId, OverlayBuildPartName, - NiceLatencyNs(GetOverlayBuildPartTimer.GetElapsedTimeUs() * 1000), + NiceTimeSpanMs(GetOverlayBuildPartTimer.GetElapsedTimeMs()), NiceBytes(OverlayBuildPartManifest.GetSize())); ChunkedFolderContent OverlayPartContent; @@ -4457,6 +4456,7 @@ namespace { if (!LocalFolderState.AreKnownFilesEqual(CurrentLocalFolderContent)) { + const size_t LocaStatePathCount = LocalFolderState.Paths.size(); std::vector<std::filesystem::path> DeletedPaths; FolderContent UpdatedContent = GetUpdatedContent(LocalFolderState, CurrentLocalFolderContent, DeletedPaths); if (!DeletedPaths.empty()) @@ -4464,9 +4464,10 @@ namespace { LocalContent = DeletePathsFromChunkedContent(LocalContent, DeletedPaths); } - ZEN_CONSOLE("Updating state, {} local files deleted and {} local files updated", + ZEN_CONSOLE("Updating state, {} local files deleted and {} local files updated out of {}", DeletedPaths.size(), - UpdatedContent.Paths.size()); + UpdatedContent.Paths.size(), + LocaStatePathCount); if (UpdatedContent.Paths.size() > 0) { uint64_t ByteCountToScan = 0; @@ -4535,7 +4536,7 @@ namespace { ZEN_CONSOLE("Using cached local state"); } - ZEN_CONSOLE("Read local state in {}", NiceLatencyNs(ReadStateTimer.GetElapsedTimeUs() * 1000)); + ZEN_CONSOLE("Read local state in {}", NiceTimeSpanMs(ReadStateTimer.GetElapsedTimeMs())); ScanContent = false; } } @@ -4610,7 +4611,7 @@ namespace { }); CreateDirectories(Path / ZenTempBlockFolderName); CreateDirectories(Path / ZenTempChunkFolderName); - CreateDirectories(Path / ZenTempReuseFolderName); + CreateDirectories(Path / ZenTempCacheFolderName); std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; @@ -4698,7 +4699,7 @@ namespace { if (CompareContent(RemoteContent, LocalContent)) { ZEN_CONSOLE("Local state is identical to build to download. All done. Completed in {}.", - NiceLatencyNs(DownloadTimer.GetElapsedTimeUs() * 1000)); + NiceTimeSpanMs(DownloadTimer.GetElapsedTimeMs())); } else { @@ -4730,7 +4731,7 @@ namespace { CreateDirectories((Path / ZenStateFilePath).parent_path()); TemporaryFile::SafeWriteFile(Path / ZenStateFilePath, StateObject.GetView()); - ZEN_CONSOLE("Wrote local state in {}", NiceLatencyNs(WriteStateTimer.GetElapsedTimeUs() * 1000)); + ZEN_CONSOLE("Wrote local state in {}", NiceTimeSpanMs(WriteStateTimer.GetElapsedTimeMs())); #if 0 ExtendableStringBuilder<1024> SB; @@ -4738,7 +4739,7 @@ namespace { WriteFile(Path / ZenStateFileJsonPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size())); #endif // 0 - ZEN_CONSOLE("Downloaded build in {}.", NiceLatencyNs(DownloadTimer.GetElapsedTimeUs() * 1000)); + ZEN_CONSOLE("Downloaded build in {}.", NiceTimeSpanMs(DownloadTimer.GetElapsedTimeMs())); } } } @@ -5095,13 +5096,13 @@ BuildsCommand::BuildsCommand() "Build part Ids list separated by ',', if no build-part-ids or build-part-names are given all parts will be downloaded", cxxopts::value(m_BuildPartIds), "<id>"); - m_DownloadOptions.add_option( - "", - "", - "build-part-name", - "Name of the build parts list separated by ',', if no build-part-ids or build-part-names are given all parts will be downloaded", - cxxopts::value(m_BuildPartNames), - "<name>"); + m_DownloadOptions.add_option("", + "", + "build-part-name", + "Name of the build parts list separated by ',', if no build-part-ids or build-part-names are given " + "all parts will be downloaded", + cxxopts::value(m_BuildPartNames), + "<name>"); m_DownloadOptions .add_option("", "", "clean", "Delete all data in target folder before downloading", cxxopts::value(m_Clean), "<clean>"); m_DownloadOptions.add_option("", @@ -5841,7 +5842,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) ZEN_CONSOLE("Scrambling files, {} remaining", PendingWork); }); ZEN_ASSERT(!AbortFlag.load()); - ZEN_CONSOLE("Scrambled files in {}", NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + ZEN_CONSOLE("Scrambled files in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }; ScrambleDir(DownloadPath); diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp index 8279fb952..85feab2f7 100644 --- a/src/zencore/filesystem.cpp +++ b/src/zencore/filesystem.cpp @@ -531,7 +531,10 @@ CloneFile(std::filesystem::path FromPath, std::filesystem::path ToPath) } void -CopyFile(std::filesystem::path FromPath, std::filesystem::path ToPath, const CopyFileOptions& Options, std::error_code& OutErrorCode) +CopyFile(const std::filesystem::path& FromPath, + const std::filesystem::path& ToPath, + const CopyFileOptions& Options, + std::error_code& OutErrorCode) { OutErrorCode.clear(); @@ -544,7 +547,7 @@ CopyFile(std::filesystem::path FromPath, std::filesystem::path ToPath, const Cop } bool -CopyFile(std::filesystem::path FromPath, std::filesystem::path ToPath, const CopyFileOptions& Options) +CopyFile(const std::filesystem::path& FromPath, const std::filesystem::path& ToPath, const CopyFileOptions& Options) { bool Success = false; diff --git a/src/zencore/include/zencore/filesystem.h b/src/zencore/include/zencore/filesystem.h index 20f6dc56c..e020668fc 100644 --- a/src/zencore/include/zencore/filesystem.h +++ b/src/zencore/include/zencore/filesystem.h @@ -97,11 +97,11 @@ struct CopyFileOptions bool MustClone = false; }; -ZENCORE_API bool CopyFile(std::filesystem::path FromPath, std::filesystem::path ToPath, const CopyFileOptions& Options); -ZENCORE_API void CopyFile(std::filesystem::path FromPath, - std::filesystem::path ToPath, - const CopyFileOptions& Options, - std::error_code& OutError); +ZENCORE_API bool CopyFile(const std::filesystem::path& FromPath, const std::filesystem::path& ToPath, const CopyFileOptions& Options); +ZENCORE_API void CopyFile(const std::filesystem::path& FromPath, + const std::filesystem::path& ToPath, + const CopyFileOptions& Options, + std::error_code& OutError); ZENCORE_API void CopyTree(std::filesystem::path FromPath, std::filesystem::path ToPath, const CopyFileOptions& Options); ZENCORE_API bool SupportsBlockRefCounting(std::filesystem::path Path); diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp index 78ebcdd55..a4bb759e7 100644 --- a/src/zenutil/filebuildstorage.cpp +++ b/src/zenutil/filebuildstorage.cpp @@ -336,7 +336,8 @@ public: const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); if (std::filesystem::is_regular_file(BlockPath)) { - IoBuffer Payload = ReadFile(BlockPath).Flatten(); + BasicFile File(BlockPath, BasicFile::Mode::kRead); + IoBuffer Payload = File.ReadAll(); ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(SharedBuffer(Payload)))); m_Stats.TotalBytesRead += Payload.GetSize(); Payload.SetContentType(ZenContentType::kCompressedBinary); @@ -365,13 +366,13 @@ public: struct WorkloadData { std::atomic<uint64_t> BytesRemaining; - IoBuffer BlobFile; + BasicFile BlobFile; std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)> Receiver; }; std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); - Workload->BlobFile = IoBufferBuilder::MakeFromFile(BlockPath); - const uint64_t BlobSize = Workload->BlobFile.GetSize(); + Workload->BlobFile.Open(BlockPath, BasicFile::Mode::kRead); + const uint64_t BlobSize = Workload->BlobFile.FileSize(); Workload->Receiver = std::move(Receiver); Workload->BytesRemaining = BlobSize; @@ -383,7 +384,8 @@ public: uint64_t Size = Min(ChunkSize, BlobSize - Offset); WorkItems.push_back([this, BlockPath, Workload, Offset, Size]() { SimulateLatency(0, 0); - IoBuffer PartPayload(Workload->BlobFile, Offset, Size); + IoBuffer PartPayload(Size); + Workload->BlobFile.Read(PartPayload.GetMutableView().GetData(), Size, Offset); m_Stats.TotalBytesRead += PartPayload.GetSize(); uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Size); Workload->Receiver(Offset, PartPayload, ByteRemaning); |