aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-03-01 10:10:53 +0100
committerGitHub Enterprise <[email protected]>2025-03-01 10:10:53 +0100
commit19b3c492dcc0fc3f8879ecb60124ca64dea9b7ef (patch)
treec0aa4e89d4a4eaf9d202898a37a3303182907c22 /src
parentimprove error handling (#289) (diff)
downloadzen-19b3c492dcc0fc3f8879ecb60124ca64dea9b7ef.tar.xz
zen-19b3c492dcc0fc3f8879ecb60124ca64dea9b7ef.zip
builds download incremental (#290)
* incremental download * merge rebuild state and output state building * fix writing when > 1 zero size file
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.cpp1235
-rw-r--r--src/zencore/filesystem.cpp7
-rw-r--r--src/zencore/include/zencore/filesystem.h10
-rw-r--r--src/zenutil/filebuildstorage.cpp12
4 files changed, 635 insertions, 629 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index 18cc7cf9e..28c794559 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -82,7 +82,7 @@ namespace {
const std::string ZenStateFilePath = fmt::format("{}/current_state.cbo", ZenFolderName);
const std::string ZenStateFileJsonPath = fmt::format("{}/current_state.json", ZenFolderName);
const std::string ZenTempFolderName = fmt::format("{}/tmp", ZenFolderName);
- const std::string ZenTempReuseFolderName = fmt::format("{}/reuse", ZenTempFolderName);
+ const std::string ZenTempCacheFolderName = fmt::format("{}/cache", ZenTempFolderName);
const std::string ZenTempStorageFolderName = fmt::format("{}/storage", ZenTempFolderName);
const std::string ZenTempBlockFolderName = fmt::format("{}/blocks", ZenTempFolderName);
const std::string ZenTempChunkFolderName = fmt::format("{}/chunks", ZenTempFolderName);
@@ -115,6 +115,54 @@ namespace {
);
+ uint32_t SetNativeFileAttributes(const std::filesystem::path FilePath, SourcePlatform SourcePlatform, uint32_t Attributes)
+ {
+#if ZEN_PLATFORM_WINDOWS
+ if (SourcePlatform == SourcePlatform::Windows)
+ {
+ SetFileAttributes(FilePath, Attributes);
+ return Attributes;
+ }
+ else
+ {
+ uint32_t CurrentAttributes = GetFileAttributes(FilePath);
+ uint32_t NewAttributes = MakeFileAttributeReadOnly(CurrentAttributes, IsFileModeReadOnly(Attributes));
+ if (CurrentAttributes != NewAttributes)
+ {
+ SetFileAttributes(FilePath, NewAttributes);
+ }
+ return NewAttributes;
+ }
+#endif // ZEN_PLATFORM_WINDOWS
+#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ if (SourcePlatform != SourcePlatform::Windows)
+ {
+ SetFileMode(FilePath, Attributes);
+ return Attributes;
+ }
+ else
+ {
+ uint32_t CurrentMode = GetFileMode(FilePath);
+ uint32_t NewMode = MakeFileModeReadOnly(CurrentMode, IsFileAttributeReadOnly(Attributes));
+ if (CurrentMode != NewMode)
+ {
+ SetFileMode(FilePath, NewMode);
+ }
+ return NewMode;
+ }
+#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ };
+
+ uint32_t GetNativeFileAttributes(const std::filesystem::path FilePath)
+ {
+#if ZEN_PLATFORM_WINDOWS
+ return GetFileAttributes(FilePath);
+#endif // ZEN_PLATFORM_WINDOWS
+#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ return GetFileMode(FilePath);
+#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ }
+
template<typename T>
std::string FormatArray(std::span<const T> Items, std::string_view Prefix)
{
@@ -181,9 +229,8 @@ namespace {
// If this is a file based buffer or a compressed buffer with a memory-based header, we don't need to rewrite to disk to save memory
std::span<const SharedBuffer> Segments = Buffer.GetSegments();
ZEN_ASSERT(Buffer.GetSegments().size() > 0);
- size_t SegmentIndexToCheck = Segments.size() > 1 ? 1 : 0;
IoBufferFileReference FileRef;
- if (Segments[SegmentIndexToCheck].GetFileReference(FileRef))
+ if (Segments.back().GetFileReference(FileRef))
{
return Buffer;
}
@@ -2114,7 +2161,7 @@ namespace {
Stopwatch PutBuildTimer;
CbObject PutBuildResult = Storage.PutBuild(BuildId, MetaData);
ZEN_CONSOLE("PutBuild took {}. Payload size: {}",
- NiceLatencyNs(PutBuildTimer.GetElapsedTimeUs() * 1000),
+ NiceTimeSpanMs(PutBuildTimer.GetElapsedTimeMs()),
NiceBytes(MetaData.GetSize()));
PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(PreferredMultipartChunkSize);
}
@@ -2122,9 +2169,7 @@ namespace {
{
Stopwatch GetBuildTimer;
CbObject Build = Storage.GetBuild(BuildId);
- ZEN_CONSOLE("GetBuild took {}. Payload size: {}",
- NiceLatencyNs(GetBuildTimer.GetElapsedTimeUs() * 1000),
- NiceBytes(Build.GetSize()));
+ ZEN_CONSOLE("GetBuild took {}. Payload size: {}", NiceTimeSpanMs(GetBuildTimer.GetElapsedTimeMs()), NiceBytes(Build.GetSize()));
if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0)
{
PreferredMultipartChunkSize = ChunkSize;
@@ -2439,7 +2484,7 @@ namespace {
Stopwatch PutBuildPartResultTimer;
std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult = Storage.PutBuildPart(BuildId, BuildPartId, BuildPartName, PartManifest);
ZEN_CONSOLE("PutBuildPart took {}, payload size {}. {} attachments are missing.",
- NiceLatencyNs(PutBuildPartResultTimer.GetElapsedTimeUs() * 1000),
+ NiceTimeSpanMs(PutBuildPartResultTimer.GetElapsedTimeMs()),
NiceBytes(PartManifest.GetSize()),
PutBuildPartResult.second.size());
IoHash PartHash = PutBuildPartResult.first;
@@ -2531,7 +2576,7 @@ namespace {
Stopwatch FinalizeBuildPartTimer;
std::vector<IoHash> Needs = Storage.FinalizeBuildPart(BuildId, BuildPartId, PartHash);
ZEN_CONSOLE("FinalizeBuildPart took {}. {} attachments are missing.",
- NiceLatencyNs(FinalizeBuildPartTimer.GetElapsedTimeUs() * 1000),
+ NiceTimeSpanMs(FinalizeBuildPartTimer.GetElapsedTimeMs()),
Needs.size());
if (Needs.empty())
{
@@ -2545,7 +2590,7 @@ namespace {
{
Stopwatch FinalizeBuildTimer;
Storage.FinalizeBuild(BuildId);
- ZEN_CONSOLE("FinalizeBuild took {}", NiceLatencyNs(FinalizeBuildTimer.GetElapsedTimeUs() * 1000));
+ ZEN_CONSOLE("FinalizeBuild took {}", NiceTimeSpanMs(FinalizeBuildTimer.GetElapsedTimeMs()));
}
if (!NewBlocks.BlockDescriptions.empty())
@@ -2949,7 +2994,7 @@ namespace {
const bool CacheWriter = TargetFinalSize > Buffer.GetSize();
if (CacheWriter)
{
- ZEN_ASSERT(std::find(SeenTargetIndexes.begin(), SeenTargetIndexes.end(), TargetIndex) == SeenTargetIndexes.end());
+ ZEN_ASSERT_SLOW(std::find(SeenTargetIndexes.begin(), SeenTargetIndexes.end(), TargetIndex) == SeenTargetIndexes.end());
OutputFile = std::move(NewOutputFile);
OpenFileWriter = std::make_unique<BasicFileWriter>(*OutputFile, Min(TargetFinalSize, 256u * 1024u));
@@ -2994,7 +3039,7 @@ namespace {
return ChunkTargetPtrs;
};
- bool WriteBlockToDisk(const std::filesystem::path& Path,
+ bool WriteBlockToDisk(const std::filesystem::path& CacheFolderPath,
const ChunkedFolderContent& Content,
const std::vector<bool>& RemotePathIndexWantsCopyFromCacheFlags,
const CompositeBuffer& DecompressedBlockBuffer,
@@ -3061,22 +3106,30 @@ namespace {
return Lhs.Target->Offset < Rhs.Target->Offset;
});
- WriteFileCache OpenFileCache;
- for (const WriteOpData& WriteOp : WriteOps)
{
- const CompositeBuffer& Chunk = ChunkBuffers[WriteOp.ChunkBufferIndex];
- const uint32_t PathIndex = WriteOp.Target->PathIndex;
- const uint64_t ChunkSize = Chunk.GetSize();
- const uint64_t FileOffset = WriteOp.Target->Offset;
- ZEN_ASSERT(FileOffset + ChunkSize <= Content.RawSizes[PathIndex]);
-
- OpenFileCache.WriteToFile<CompositeBuffer>(
- PathIndex,
- [&Path, &Content](uint32_t TargetIndex) { return (Path / Content.Paths[TargetIndex]).make_preferred(); },
- Chunk,
- FileOffset,
- Content.RawSizes[PathIndex]);
- OutBytesWritten += ChunkSize;
+ WriteFileCache OpenFileCache;
+ for (const WriteOpData& WriteOp : WriteOps)
+ {
+ if (AbortFlag)
+ {
+ break;
+ }
+ const CompositeBuffer& Chunk = ChunkBuffers[WriteOp.ChunkBufferIndex];
+ const uint32_t PathIndex = WriteOp.Target->PathIndex;
+ const uint64_t ChunkSize = Chunk.GetSize();
+ const uint64_t FileOffset = WriteOp.Target->Offset;
+ ZEN_ASSERT(FileOffset + ChunkSize <= Content.RawSizes[PathIndex]);
+
+ OpenFileCache.WriteToFile<CompositeBuffer>(
+ PathIndex,
+ [&CacheFolderPath, &Content](uint32_t TargetIndex) {
+ return (CacheFolderPath / Content.RawHashes[TargetIndex].ToHexString()).make_preferred();
+ },
+ Chunk,
+ FileOffset,
+ Content.RawSizes[PathIndex]);
+ OutBytesWritten += ChunkSize;
+ }
}
OutChunksComplete += gsl::narrow<uint32_t>(ChunkBuffers.size());
}
@@ -3086,11 +3139,11 @@ namespace {
return false;
}
- SharedBuffer Decompress(const IoBuffer& CompressedChunk, const IoHash& ChunkHash, const uint64_t ChunkRawSize)
+ SharedBuffer Decompress(const CompositeBuffer& CompressedChunk, const IoHash& ChunkHash, const uint64_t ChunkRawSize)
{
IoHash RawHash;
uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(CompressedChunk), RawHash, RawSize);
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedChunk, RawHash, RawSize);
if (!Compressed)
{
throw std::runtime_error(fmt::format("Invalid build blob format for chunk {}", ChunkHash));
@@ -3118,7 +3171,7 @@ namespace {
return Decompressed;
}
- void WriteChunkToDisk(const std::filesystem::path& Path,
+ void WriteChunkToDisk(const std::filesystem::path& CacheFolderPath,
const ChunkedFolderContent& Content,
std::span<const ChunkedContentLookup::ChunkLocation* const> ChunkTargets,
const CompositeBuffer& ChunkData,
@@ -3132,7 +3185,10 @@ namespace {
OpenFileCache.WriteToFile(
Target.PathIndex,
- [&Path, &Content](uint32_t TargetIndex) { return (Path / Content.Paths[TargetIndex]).make_preferred(); },
+ [&CacheFolderPath, &Content](uint32_t TargetIndex) {
+ return (CacheFolderPath / Content.RawHashes[TargetIndex].ToHexString()).make_preferred();
+ // return (Path / Content.Paths[TargetIndex]).make_preferred();
+ },
ChunkData,
FileOffset,
Content.RawSizes[Target.PathIndex]);
@@ -3141,7 +3197,8 @@ namespace {
}
void DownloadLargeBlob(BuildStorage& Storage,
- const std::filesystem::path& Path,
+ const std::filesystem::path& TempFolderPath,
+ const std::filesystem::path& CacheFolderPath,
const ChunkedFolderContent& RemoteContent,
const ChunkedContentLookup& RemoteLookup,
const Oid& BuildId,
@@ -3166,7 +3223,7 @@ namespace {
std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
std::error_code Ec;
- Workload->TempFile.CreateTemporary(Path / ZenTempChunkFolderName, Ec);
+ Workload->TempFile.CreateTemporary(TempFolderPath, Ec);
if (Ec)
{
throw std::runtime_error(
@@ -3176,7 +3233,7 @@ namespace {
BuildId,
ChunkHash,
PreferredMultipartChunkSize,
- [&Path,
+ [&CacheFolderPath,
&RemoteContent,
&RemoteLookup,
&Work,
@@ -3203,7 +3260,7 @@ namespace {
Work.ScheduleWork(
WritePool,
- [&Path,
+ [&CacheFolderPath,
&RemoteContent,
&RemoteLookup,
ChunkHash,
@@ -3234,8 +3291,9 @@ namespace {
uint32_t ChunkIndex = RemoteLookup.ChunkHashToChunkIndex.at(ChunkHash);
- SharedBuffer Chunk =
- Decompress(CompressedPart, ChunkHash, RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+ SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)),
+ ChunkHash,
+ RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
// ZEN_ASSERT_SLOW(ChunkHash ==
// IoHash::HashBuffer(Chunk.AsIoBuffer()));
@@ -3244,7 +3302,7 @@ namespace {
{
WriteFileCache OpenFileCache;
- WriteChunkToDisk(Path,
+ WriteChunkToDisk(CacheFolderPath,
RemoteContent,
ChunkTargetPtrs,
CompositeBuffer(Chunk),
@@ -3290,6 +3348,7 @@ namespace {
bool WipeTargetFolder,
FolderContent& OutLocalFolderState)
{
+ ZEN_UNUSED(WipeTargetFolder);
std::atomic<uint64_t> DownloadedBlocks = 0;
std::atomic<uint64_t> BlockBytes = 0;
std::atomic<uint64_t> DownloadedChunks = 0;
@@ -3307,16 +3366,17 @@ namespace {
ZEN_CONSOLE("Indexed local and remote content in {}", NiceTimeSpanMs(IndexTimer.GetElapsedTimeMs()));
- const std::filesystem::path CacheFolderPath = Path / ZenTempReuseFolderName;
+ const std::filesystem::path CacheFolderPath = Path / ZenTempCacheFolderName;
- tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> LocalRawHashToPathIndex;
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToLocalPathIndex;
- if (!WipeTargetFolder)
{
Stopwatch CacheTimer;
for (uint32_t LocalPathIndex = 0; LocalPathIndex < LocalContent.Paths.size(); LocalPathIndex++)
{
+ ZEN_ASSERT_SLOW(std::filesystem::exists(Path / LocalContent.Paths[LocalPathIndex]));
+
if (LocalContent.RawSizes[LocalPathIndex] > 0)
{
const uint32_t SequenceRawHashIndex =
@@ -3325,99 +3385,33 @@ namespace {
if (ChunkCount > 0)
{
const IoHash LocalRawHash = LocalContent.RawHashes[LocalPathIndex];
- if (!LocalRawHashToPathIndex.contains(LocalRawHash))
+ if (!RawHashToLocalPathIndex.contains(LocalRawHash))
{
- LocalRawHashToPathIndex.insert_or_assign(LocalRawHash, LocalPathIndex);
+ RawHashToLocalPathIndex.insert_or_assign(LocalRawHash, LocalPathIndex);
}
}
}
}
-
- {
- std::vector<bool> IncludeLocalFiles(LocalContent.Paths.size(), false);
-
- for (const IoHash& ChunkHash : RemoteContent.ChunkedContent.ChunkHashes)
- {
- if (auto It = LocalLookup.ChunkHashToChunkIndex.find(ChunkHash); It != LocalLookup.ChunkHashToChunkIndex.end())
- {
- const uint32_t LocalChunkIndex = It->second;
- std::span<const ChunkedContentLookup::ChunkLocation> LocalChunkTargetRange =
- GetChunkLocations(LocalLookup, LocalChunkIndex);
- if (!LocalChunkTargetRange.empty())
- {
- std::uint32_t LocalPathIndex = LocalChunkTargetRange[0].PathIndex;
- IncludeLocalFiles[LocalPathIndex] = true;
- }
- }
- }
- for (const IoHash& RawHash : RemoteContent.RawHashes)
- {
- if (auto It = LocalRawHashToPathIndex.find(RawHash); It != LocalRawHashToPathIndex.end())
- {
- uint32_t LocalPathIndex = It->second;
- IncludeLocalFiles[LocalPathIndex] = true;
- }
- }
-
- for (uint32_t LocalPathIndex = 0; LocalPathIndex < LocalContent.Paths.size(); LocalPathIndex++)
- {
- if (!IncludeLocalFiles[LocalPathIndex])
- {
- LocalRawHashToPathIndex.erase(LocalContent.RawHashes[LocalPathIndex]);
- }
- }
- }
-
- uint64_t CachedBytes = 0;
- CreateDirectories(CacheFolderPath);
- for (auto& CachedLocalFile : LocalRawHashToPathIndex)
- {
- const IoHash& LocalRawHash = CachedLocalFile.first;
- const uint32_t LocalPathIndex = CachedLocalFile.second;
- const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
- const std::filesystem::path CacheFilePath = (CacheFolderPath / LocalRawHash.ToHexString()).make_preferred();
-
- SetFileReadOnly(LocalFilePath, false);
-
- std::filesystem::rename(LocalFilePath, CacheFilePath);
- CachedBytes += std::filesystem::file_size(CacheFilePath);
- }
-
- ZEN_CONSOLE("Cached {} ({}) local files in {}",
- LocalRawHashToPathIndex.size(),
- NiceBytes(CachedBytes),
- NiceTimeSpanMs(CacheTimer.GetElapsedTimeMs()));
- }
-
- if (AbortFlag)
- {
- return;
}
-
- CleanDirectory(Path, DefaultExcludeFolders);
-
Stopwatch CacheMappingTimer;
std::atomic<uint64_t> BytesWritten = 0;
uint64_t CacheMappedBytesForReuse = 0;
- std::vector<bool> RemotePathIndexWantsCopyFromCacheFlags(RemoteContent.Paths.size(), false);
- std::vector<std::atomic<bool>> RemoteChunkIndexWantsCopyFromCacheFlags(RemoteContent.ChunkedContent.ChunkHashes.size());
+ std::vector<bool> RemotePathIndexWantsCopyFromCacheFlags(RemoteContent.Paths.size(), false);
+ std::vector<bool> RemoteChunkIndexWantsCopyFromCacheFlags(RemoteContent.ChunkedContent.ChunkHashes.size());
// Guard if he same chunks is in multiple blocks (can happen due to block reuse, cache reuse blocks writes directly)
std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(RemoteContent.ChunkedContent.ChunkHashes.size());
struct CacheCopyData
{
- std::filesystem::path OriginalSourceFileName;
- IoHash LocalFileRawHash;
- uint64_t LocalFileRawSize = 0;
- std::vector<uint32_t> RemotePathIndexes;
- std::vector<const ChunkedContentLookup::ChunkLocation*> ChunkSourcePtrs;
+ uint32_t LocalPathIndex;
+ std::vector<const ChunkedContentLookup::ChunkLocation*> TargetChunkLocationPtrs;
struct ChunkTarget
{
- uint32_t ChunkSourceCount;
+ uint32_t TargetChunkLocationCount;
uint64_t ChunkRawSize;
- uint64_t LocalFileOffset;
+ uint64_t CacheFileOffset;
};
std::vector<ChunkTarget> ChunkTargets;
};
@@ -3426,42 +3420,24 @@ namespace {
std::vector<CacheCopyData> CacheCopyDatas;
uint32_t ChunkCountToWrite = 0;
- // Pick up all whole files to copy and/or move
+ // Pick up all whole files we can use from current local state
for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++)
{
const IoHash& RemoteRawHash = RemoteContent.RawHashes[RemotePathIndex];
- if (auto It = LocalRawHashToPathIndex.find(RemoteRawHash); It != LocalRawHashToPathIndex.end())
+ if (auto It = RawHashToLocalPathIndex.find(RemoteRawHash); It != RawHashToLocalPathIndex.end())
{
- if (auto CopySourceIt = RawHashToCacheCopyDataIndex.find(RemoteRawHash); CopySourceIt != RawHashToCacheCopyDataIndex.end())
- {
- CacheCopyData& Data = CacheCopyDatas[CopySourceIt->second];
- Data.RemotePathIndexes.push_back(RemotePathIndex);
- }
- else
- {
- const uint32_t LocalPathIndex = It->second;
- ZEN_ASSERT(LocalContent.RawSizes[LocalPathIndex] == RemoteContent.RawSizes[RemotePathIndex]);
- ZEN_ASSERT(LocalContent.RawHashes[LocalPathIndex] == RemoteContent.RawHashes[RemotePathIndex]);
- RawHashToCacheCopyDataIndex.insert_or_assign(RemoteRawHash, CacheCopyDatas.size());
- CacheCopyDatas.push_back(CacheCopyData{.OriginalSourceFileName = LocalContent.Paths[LocalPathIndex],
- .LocalFileRawHash = RemoteRawHash,
- .LocalFileRawSize = LocalContent.RawSizes[LocalPathIndex],
- .RemotePathIndexes = {RemotePathIndex}});
- CacheMappedBytesForReuse += RemoteContent.RawSizes[RemotePathIndex];
- ChunkCountToWrite++;
- }
RemotePathIndexWantsCopyFromCacheFlags[RemotePathIndex] = true;
+ CacheMappedBytesForReuse += RemoteContent.RawSizes[RemotePathIndex];
}
}
- // Pick up all chunks in cached files and make sure we block moving of cache files if we need part of them
- for (auto& CachedLocalFile : LocalRawHashToPathIndex)
+ // Pick up all chunks in current local state
+ for (auto& CachedLocalFile : RawHashToLocalPathIndex)
{
const IoHash& LocalFileRawHash = CachedLocalFile.first;
const uint32_t LocalPathIndex = CachedLocalFile.second;
const uint32_t LocalSequenceRawHashIndex = LocalLookup.RawHashToSequenceRawHashIndex.at(LocalFileRawHash);
- const uint32_t LocalOrderOffset =
- LocalLookup.SequenceRawHashIndexChunkOrderOffset[LocalSequenceRawHashIndex]; // CachedLocalFile.second.ChunkOrderOffset;
+ const uint32_t LocalOrderOffset = LocalLookup.SequenceRawHashIndexChunkOrderOffset[LocalSequenceRawHashIndex];
{
uint64_t SourceOffset = 0;
@@ -3482,30 +3458,30 @@ namespace {
if (!ChunkTargetPtrs.empty())
{
- CacheCopyData::ChunkTarget Target = {.ChunkSourceCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()),
- .ChunkRawSize = LocalChunkRawSize,
- .LocalFileOffset = SourceOffset};
+ CacheCopyData::ChunkTarget Target = {
+ .TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()),
+ .ChunkRawSize = LocalChunkRawSize,
+ .CacheFileOffset = SourceOffset};
if (auto CopySourceIt = RawHashToCacheCopyDataIndex.find(LocalFileRawHash);
CopySourceIt != RawHashToCacheCopyDataIndex.end())
{
CacheCopyData& Data = CacheCopyDatas[CopySourceIt->second];
- Data.ChunkSourcePtrs.insert(Data.ChunkSourcePtrs.end(), ChunkTargetPtrs.begin(), ChunkTargetPtrs.end());
+ Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(),
+ ChunkTargetPtrs.begin(),
+ ChunkTargetPtrs.end());
Data.ChunkTargets.push_back(Target);
}
else
{
RawHashToCacheCopyDataIndex.insert_or_assign(LocalFileRawHash, CacheCopyDatas.size());
CacheCopyDatas.push_back(
- CacheCopyData{.OriginalSourceFileName = LocalContent.Paths[LocalPathIndex],
- .LocalFileRawHash = LocalFileRawHash,
- .LocalFileRawSize = LocalContent.RawSizes[LocalPathIndex],
- .RemotePathIndexes = {},
- .ChunkSourcePtrs = ChunkTargetPtrs,
- .ChunkTargets = std::vector<CacheCopyData::ChunkTarget>{Target}});
+ CacheCopyData{.LocalPathIndex = LocalPathIndex,
+ .TargetChunkLocationPtrs = ChunkTargetPtrs,
+ .ChunkTargets = std::vector<CacheCopyData::ChunkTarget>{Target}});
}
CacheMappedBytesForReuse += LocalChunkRawSize;
+ RemoteChunkIndexWantsCopyFromCacheFlags[RemoteChunkIndex] = true;
}
- RemoteChunkIndexWantsCopyFromCacheFlags[RemoteChunkIndex] = true;
}
}
SourceOffset += LocalChunkRawSize;
@@ -3535,531 +3511,554 @@ namespace {
ZEN_CONSOLE("Mapped {} cached data for reuse in {}",
NiceBytes(CacheMappedBytesForReuse),
NiceTimeSpanMs(CacheMappingTimer.GetElapsedTimeMs()));
+ {
+ WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
+ WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
- auto CopyChunksFromCacheFile = [](const std::filesystem::path& Path,
- BufferedOpenFile& SourceFile,
- WriteFileCache& OpenFileCache,
- const ChunkedFolderContent& RemoteContent,
- const uint64_t LocalFileSourceOffset,
- const uint64_t LocalChunkRawSize,
- std::span<const ChunkedContentLookup::ChunkLocation* const> ChunkTargetPtrs,
- uint64_t& OutBytesWritten) {
- CompositeBuffer Chunk = SourceFile.GetRange(LocalFileSourceOffset, LocalChunkRawSize);
- uint64_t TotalBytesWritten = 0;
-
- WriteChunkToDisk(Path, RemoteContent, ChunkTargetPtrs, Chunk, OpenFileCache, TotalBytesWritten);
- OutBytesWritten += TotalBytesWritten;
- };
-
- auto CloneFullFileFromCache = [](const std::filesystem::path& Path,
- const std::filesystem::path& CacheFolderPath,
- const ChunkedFolderContent& RemoteContent,
- const IoHash& FileRawHash,
- const uint64_t FileRawSize,
- std::span<const uint32_t> FullCloneRemotePathIndexes,
- bool CanMove,
- uint64_t& OutBytesWritten) {
- const std::filesystem::path CacheFilePath = (CacheFolderPath / FileRawHash.ToHexString()).make_preferred();
+ ProgressBar WriteProgressBar(UsePlainProgress);
+ ParallellWork Work(AbortFlag);
- size_t CopyCount = FullCloneRemotePathIndexes.size();
- if (CanMove)
- {
- // If every reference to this chunk has are full files we can move the cache file to the last target
- CopyCount--;
- }
+ std::atomic<uint64_t> BytesDownloaded = 0;
- for (uint32_t RemotePathIndex : FullCloneRemotePathIndexes)
+ for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++)
{
- const std::filesystem::path TargetPath = (Path / RemoteContent.Paths[RemotePathIndex]).make_preferred();
- CreateDirectories(TargetPath.parent_path());
-
- if (CopyCount == 0)
- {
- std::filesystem::rename(CacheFilePath, TargetPath);
- }
- else
+ if (AbortFlag)
{
- CopyFile(CacheFilePath, TargetPath, {.EnableClone = false});
- ZEN_ASSERT(CopyCount > 0);
- CopyCount--;
+ break;
}
- OutBytesWritten += FileRawSize;
- }
- };
-
- WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
- WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
-
- ProgressBar WriteProgressBar(UsePlainProgress);
- ParallellWork Work(AbortFlag);
-
- std::atomic<uint64_t> BytesDownloaded = 0;
-
- for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++)
- {
- if (AbortFlag)
- {
- break;
- }
-
- Work.ScheduleWork(
- WritePool, // GetSyncWorkerPool(),//
- [&, CopyDataIndex](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex];
- const std::filesystem::path CacheFilePath =
- (CacheFolderPath / CopyData.LocalFileRawHash.ToHexString()).make_preferred();
- if (!CopyData.ChunkSourcePtrs.empty())
+ Work.ScheduleWork(
+ WritePool, // GetSyncWorkerPool(),//
+ [&, CopyDataIndex](std::atomic<bool>&) {
+ if (!AbortFlag)
{
- uint64_t CacheLocalFileBytesRead = 0;
-
- size_t TargetStart = 0;
- const std::span<const ChunkedContentLookup::ChunkLocation* const> AllTargets(CopyData.ChunkSourcePtrs);
-
- struct WriteOp
+ const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex];
+ const std::filesystem::path LocalFilePath =
+ (Path / LocalContent.Paths[CopyData.LocalPathIndex]).make_preferred();
+ if (!CopyData.TargetChunkLocationPtrs.empty())
{
- const ChunkedContentLookup::ChunkLocation* Target;
- uint64_t LocalFileOffset;
- uint64_t ChunkSize;
- };
+ uint64_t CacheLocalFileBytesRead = 0;
- std::vector<WriteOp> WriteOps;
- WriteOps.reserve(CopyData.ChunkSourcePtrs.size());
+ size_t TargetStart = 0;
+ const std::span<const ChunkedContentLookup::ChunkLocation* const> AllTargets(
+ CopyData.TargetChunkLocationPtrs);
- for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets)
- {
- std::span<const ChunkedContentLookup::ChunkLocation* const> TargetRange =
- AllTargets.subspan(TargetStart, ChunkTarget.ChunkSourceCount);
- for (const ChunkedContentLookup::ChunkLocation* Target : TargetRange)
+ struct WriteOp
{
- WriteOps.push_back(WriteOp{.Target = Target,
- .LocalFileOffset = ChunkTarget.LocalFileOffset,
- .ChunkSize = ChunkTarget.ChunkRawSize});
- }
- TargetStart += ChunkTarget.ChunkSourceCount;
- }
+ const ChunkedContentLookup::ChunkLocation* Target;
+ uint64_t CacheFileOffset;
+ uint64_t ChunkSize;
+ };
- std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) {
- if (Lhs.Target->PathIndex < Rhs.Target->PathIndex)
+ std::vector<WriteOp> WriteOps;
+ WriteOps.reserve(AllTargets.size());
+
+ for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets)
{
- return true;
+ std::span<const ChunkedContentLookup::ChunkLocation* const> TargetRange =
+ AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount);
+ for (const ChunkedContentLookup::ChunkLocation* Target : TargetRange)
+ {
+ WriteOps.push_back(WriteOp{.Target = Target,
+ .CacheFileOffset = ChunkTarget.CacheFileOffset,
+ .ChunkSize = ChunkTarget.ChunkRawSize});
+ }
+ TargetStart += ChunkTarget.TargetChunkLocationCount;
}
- else if (Lhs.Target->PathIndex > Rhs.Target->PathIndex)
- {
+
+ std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) {
+ if (Lhs.Target->PathIndex < Rhs.Target->PathIndex)
+ {
+ return true;
+ }
+ else if (Lhs.Target->PathIndex > Rhs.Target->PathIndex)
+ {
+ return false;
+ }
+ if (Lhs.Target->Offset < Rhs.Target->Offset)
+ {
+ return true;
+ }
return false;
+ });
+
+ {
+ BufferedOpenFile SourceFile(LocalFilePath);
+ WriteFileCache OpenFileCache;
+ for (const WriteOp& Op : WriteOps)
+ {
+ if (AbortFlag)
+ {
+ break;
+ }
+ const uint32_t RemotePathIndex = Op.Target->PathIndex;
+ const uint64_t ChunkSize = Op.ChunkSize;
+ CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ChunkSize);
+
+ ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]);
+
+ OpenFileCache.WriteToFile<CompositeBuffer>(
+ RemotePathIndex,
+ [&CacheFolderPath, &RemoteContent](uint32_t TargetIndex) {
+ return (CacheFolderPath / RemoteContent.RawHashes[TargetIndex].ToHexString())
+ .make_preferred();
+ },
+ ChunkSource,
+ Op.Target->Offset,
+ RemoteContent.RawSizes[RemotePathIndex]);
+ BytesWritten += ChunkSize;
+ WriteToDiskBytes += ChunkSize;
+ CacheLocalFileBytesRead += ChunkSize; // TODO: This should be the sum of unique chunk sizes?
+ }
}
- if (Lhs.Target->Offset < Rhs.Target->Offset)
+ if (!AbortFlag)
{
- return true;
+ ChunkCountWritten += gsl::narrow<uint32_t>(CopyData.ChunkTargets.size());
+ ZEN_DEBUG("Copied {} from {}",
+ NiceBytes(CacheLocalFileBytesRead),
+ LocalContent.Paths[CopyData.LocalPathIndex]);
}
- return false;
- });
+ }
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
- BufferedOpenFile SourceFile(CacheFilePath);
- WriteFileCache OpenFileCache;
- for (const WriteOp& Op : WriteOps)
- {
- if (AbortFlag)
+ for (const IoHash ChunkHash : LooseChunkHashes)
+ {
+ if (AbortFlag)
+ {
+ break;
+ }
+
+ uint32_t RemoteChunkIndex = RemoteLookup.ChunkHashToChunkIndex.at(ChunkHash);
+ if (RemoteChunkIndexWantsCopyFromCacheFlags[RemoteChunkIndex])
+ {
+ ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash);
+ continue;
+ }
+ bool NeedsCopy = true;
+ if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false))
+ {
+ std::vector<const ChunkedContentLookup::ChunkLocation*> ChunkTargetPtrs =
+ GetRemainingChunkTargets(RemotePathIndexWantsCopyFromCacheFlags, RemoteLookup, RemoteChunkIndex);
+
+ if (ChunkTargetPtrs.empty())
+ {
+ ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash);
+ }
+ else
+ {
+ Work.ScheduleWork(
+ NetworkPool,
+ [&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) {
+ if (!AbortFlag)
{
- break;
- }
- const uint32_t RemotePathIndex = Op.Target->PathIndex;
- const uint64_t ChunkSize = Op.ChunkSize;
- CompositeBuffer ChunkSource = SourceFile.GetRange(Op.LocalFileOffset, ChunkSize);
+ if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize)
+ {
+ DownloadLargeBlob(Storage,
+ Path / ZenTempChunkFolderName,
+ CacheFolderPath,
+ RemoteContent,
+ RemoteLookup,
+ BuildId,
+ ChunkHash,
+ PreferredMultipartChunkSize,
+ ChunkTargetPtrs,
+ Work,
+ WritePool,
+ NetworkPool,
+ BytesWritten,
+ WriteToDiskBytes,
+ BytesDownloaded,
+ LooseChunksBytes,
+ DownloadedChunks,
+ ChunkCountWritten,
+ MultipartAttachmentCount);
+ }
+ else
+ {
+ IoBuffer CompressedPart = Storage.GetBuildBlob(BuildId, ChunkHash);
+ if (!CompressedPart)
+ {
+ throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
+ }
+ BytesDownloaded += CompressedPart.GetSize();
+ LooseChunksBytes += CompressedPart.GetSize();
+ CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(CompressedPart)),
+ Path / ZenTempChunkFolderName,
+ ChunkHash);
+ DownloadedChunks++;
- ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]);
+ if (!AbortFlag)
+ {
+ Work.ScheduleWork(
+ WritePool,
+ [&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs, CompressedPart = std::move(Payload)](
+ std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ uint64_t TotalBytesWritten = 0;
+ SharedBuffer Chunk =
+ Decompress(CompressedPart,
+ ChunkHash,
+ RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex]);
- OpenFileCache.WriteToFile<CompositeBuffer>(
- RemotePathIndex,
- [&Path, &RemoteContent](uint32_t TargetIndex) {
- return (Path / RemoteContent.Paths[TargetIndex]).make_preferred();
- },
- ChunkSource,
- Op.Target->Offset,
- RemoteContent.RawSizes[RemotePathIndex]);
- BytesWritten += ChunkSize;
- WriteToDiskBytes += ChunkSize;
- CacheLocalFileBytesRead += ChunkSize; // TODO: This should be the sum of unique chunk sizes?
- }
- if (!AbortFlag)
- {
- ChunkCountWritten += gsl::narrow<uint32_t>(CopyData.ChunkTargets.size());
- ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), CopyData.OriginalSourceFileName);
- }
- }
+ {
+ WriteFileCache OpenFileCache;
+ WriteChunkToDisk(CacheFolderPath,
+ RemoteContent,
+ ChunkTargetPtrs,
+ CompositeBuffer(Chunk),
+ OpenFileCache,
+ TotalBytesWritten);
+ }
+ ChunkCountWritten++;
+ BytesWritten += TotalBytesWritten;
+ WriteToDiskBytes += TotalBytesWritten;
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+ }
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+ }
+ }
- if (CopyData.RemotePathIndexes.empty())
- {
- std::filesystem::remove(CacheFilePath);
- }
- else if (!AbortFlag)
+ size_t BlockCount = BlockDescriptions.size();
+ std::atomic<size_t> BlocksComplete = 0;
+
+ auto IsBlockNeeded = [&RemoteContent, &RemoteLookup, &RemoteChunkIndexNeedsCopyFromSourceFlags](
+ const ChunkBlockDescription& BlockDescription) -> bool {
+ for (const IoHash& ChunkHash : BlockDescription.ChunkRawHashes)
+ {
+ if (auto It = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != RemoteLookup.ChunkHashToChunkIndex.end())
+ {
+ const uint32_t RemoteChunkIndex = It->second;
+ if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex])
{
- uint64_t LocalBytesWritten = 0;
- CloneFullFileFromCache(Path,
- CacheFolderPath,
- RemoteContent,
- CopyData.LocalFileRawHash,
- CopyData.LocalFileRawSize,
- CopyData.RemotePathIndexes,
- true,
- LocalBytesWritten);
- // CacheLocalFileBytesRead += CopyData.LocalFileRawSize;
- BytesWritten += LocalBytesWritten;
- WriteToDiskBytes += LocalBytesWritten;
- ChunkCountWritten++;
-
- ZEN_DEBUG("Used full cached file {} ({}) for {} ({}) targets",
- CopyData.OriginalSourceFileName,
- NiceBytes(CopyData.LocalFileRawSize),
- CopyData.RemotePathIndexes.size(),
- NiceBytes(LocalBytesWritten));
+ return true;
}
}
- },
- Work.DefaultErrorFunction());
- }
-
- for (const IoHash ChunkHash : LooseChunkHashes)
- {
- if (AbortFlag)
- {
- break;
- }
+ }
+ return false;
+ };
- uint32_t RemoteChunkIndex = RemoteLookup.ChunkHashToChunkIndex.at(ChunkHash);
- if (RemoteChunkIndexWantsCopyFromCacheFlags[RemoteChunkIndex])
+ size_t BlocksNeededCount = 0;
+ for (size_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++)
{
- ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash);
- continue;
- }
- bool NeedsCopy = true;
- if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false))
- {
- std::vector<const ChunkedContentLookup::ChunkLocation*> ChunkTargetPtrs =
- GetRemainingChunkTargets(RemotePathIndexWantsCopyFromCacheFlags, RemoteLookup, RemoteChunkIndex);
-
- if (ChunkTargetPtrs.empty())
+ if (Work.IsAborted())
{
- ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash);
+ break;
}
- else
+ if (IsBlockNeeded(BlockDescriptions[BlockIndex]))
{
+ BlocksNeededCount++;
Work.ScheduleWork(
NetworkPool,
- [&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) {
+ [&, BlockIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
- if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize)
+ IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescriptions[BlockIndex].BlockHash);
+ if (!BlockBuffer)
{
- DownloadLargeBlob(Storage,
- Path,
- RemoteContent,
- RemoteLookup,
- BuildId,
- ChunkHash,
- PreferredMultipartChunkSize,
- ChunkTargetPtrs,
- Work,
- WritePool,
- NetworkPool,
- BytesWritten,
- WriteToDiskBytes,
- BytesDownloaded,
- LooseChunksBytes,
- DownloadedChunks,
- ChunkCountWritten,
- MultipartAttachmentCount);
+ throw std::runtime_error(fmt::format("Block {} is missing", BlockDescriptions[BlockIndex].BlockHash));
}
- else
+ BytesDownloaded += BlockBuffer.GetSize();
+ BlockBytes += BlockBuffer.GetSize();
+ DownloadedBlocks++;
+ CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(BlockBuffer)),
+ Path / ZenTempBlockFolderName,
+ BlockDescriptions[BlockIndex].BlockHash);
+
+ if (!AbortFlag)
{
- IoBuffer CompressedPart = Storage.GetBuildBlob(BuildId, ChunkHash);
- if (!CompressedPart)
- {
- throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
- }
- BytesDownloaded += CompressedPart.GetSize();
- LooseChunksBytes += CompressedPart.GetSize();
- DownloadedChunks++;
+ Work.ScheduleWork(
+ WritePool,
+ [&, BlockIndex, BlockBuffer = std::move(Payload)](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ IoHash BlockRawHash;
+ uint64_t BlockRawSize;
+ CompressedBuffer CompressedBlockBuffer =
+ CompressedBuffer::FromCompressed(std::move(BlockBuffer), BlockRawHash, BlockRawSize);
+ if (!CompressedBlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} is not a compressed buffer",
+ BlockDescriptions[BlockIndex].BlockHash));
+ }
- if (!AbortFlag)
- {
- Work.ScheduleWork(
- WritePool,
- [&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs, CompressedPart = std::move(CompressedPart)](
- std::atomic<bool>&) {
- if (!AbortFlag)
+ if (BlockRawHash != BlockDescriptions[BlockIndex].BlockHash)
+ {
+ throw std::runtime_error(fmt::format("Block {} header has a mismatching raw hash {}",
+ BlockDescriptions[BlockIndex].BlockHash,
+ BlockRawHash));
+ }
+
+ CompositeBuffer DecompressedBlockBuffer = CompressedBlockBuffer.DecompressToComposite();
+ if (!DecompressedBlockBuffer)
{
- uint64_t TotalBytesWritten = 0;
- SharedBuffer Chunk =
- Decompress(CompressedPart,
- ChunkHash,
- RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex]);
- WriteFileCache OpenFileCache;
-
- WriteChunkToDisk(Path,
+ throw std::runtime_error(fmt::format("Block {} failed to decompress",
+ BlockDescriptions[BlockIndex].BlockHash));
+ }
+
+ ZEN_ASSERT_SLOW(BlockDescriptions[BlockIndex].BlockHash ==
+ IoHash::HashBuffer(DecompressedBlockBuffer));
+
+ uint64_t BytesWrittenToDisk = 0;
+ uint32_t ChunksReadFromBlock = 0;
+ if (WriteBlockToDisk(CacheFolderPath,
RemoteContent,
- ChunkTargetPtrs,
- CompositeBuffer(Chunk),
- OpenFileCache,
- TotalBytesWritten);
- ChunkCountWritten++;
- BytesWritten += TotalBytesWritten;
- WriteToDiskBytes += TotalBytesWritten;
+ RemotePathIndexWantsCopyFromCacheFlags,
+ DecompressedBlockBuffer,
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags.data(),
+ ChunksReadFromBlock,
+ BytesWrittenToDisk))
+ {
+ BytesWritten += BytesWrittenToDisk;
+ WriteToDiskBytes += BytesWrittenToDisk;
+ ChunkCountWritten += ChunksReadFromBlock;
}
- },
- Work.DefaultErrorFunction());
- }
+ else
+ {
+ throw std::runtime_error(
+ fmt::format("Block {} is malformed", BlockDescriptions[BlockIndex].BlockHash));
+ }
+ BlocksComplete++;
+ }
+ },
+ [&, BlockIndex](const std::exception& Ex, std::atomic<bool>&) {
+ ZEN_ERROR("Failed writing block {}. Reason: {}",
+ BlockDescriptions[BlockIndex].BlockHash,
+ Ex.what());
+ AbortFlag = true;
+ });
}
}
},
Work.DefaultErrorFunction());
}
+ else
+ {
+ ZEN_DEBUG("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash);
+ }
}
+
+ Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(IsAborted, PendingWork);
+ ZEN_ASSERT(ChunkCountToWrite >= ChunkCountWritten.load());
+ WriteProgressBar.UpdateState(
+ {.Task = "Writing chunks ",
+ .Details = fmt::format("Written {} chunks out of {}. {} ouf of {} blocks complete. Downloaded: {}. Written: {}",
+ ChunkCountWritten.load(),
+ ChunkCountToWrite,
+ BlocksComplete.load(),
+ BlocksNeededCount,
+ NiceBytes(BytesDownloaded.load()),
+ NiceBytes(BytesWritten.load())),
+ .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite),
+ .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - ChunkCountWritten.load())},
+ false);
+ });
+
+ if (AbortFlag)
+ {
+ return;
+ }
+
+ WriteProgressBar.Finish();
}
- size_t BlockCount = BlockDescriptions.size();
- std::atomic<size_t> BlocksComplete = 0;
+ std::vector<std::pair<IoHash, uint32_t>> Targets;
+ Targets.reserve(RemoteContent.Paths.size());
+ for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++)
+ {
+ Targets.push_back(std::make_pair(RemoteContent.RawHashes[RemotePathIndex], RemotePathIndex));
+ }
+ std::sort(Targets.begin(), Targets.end(), [](const std::pair<IoHash, uint32_t>& Lhs, const std::pair<IoHash, uint32_t>& Rhs) {
+ return Lhs.first < Rhs.first;
+ });
+
+ // Move all files we will reuse to cache folder
+ for (auto It : RawHashToLocalPathIndex)
+ {
+ const IoHash& RawHash = It.first;
+ if (RemoteLookup.RawHashToSequenceRawHashIndex.contains(RawHash))
+ {
+ const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[It.second]).make_preferred();
+ const std::filesystem::path CacheFilePath = (CacheFolderPath / RawHash.ToHexString()).make_preferred();
+ ZEN_ASSERT_SLOW(std::filesystem::exists(LocalFilePath));
+ SetFileReadOnly(LocalFilePath, false);
+ std::filesystem::rename(LocalFilePath, CacheFilePath);
+ }
+ }
- auto IsBlockNeeded = [&RemoteContent, &RemoteLookup, &RemoteChunkIndexNeedsCopyFromSourceFlags](
- const ChunkBlockDescription& BlockDescription) -> bool {
- for (const IoHash& ChunkHash : BlockDescription.ChunkRawHashes)
+ if (WipeTargetFolder)
+ {
+ // Clean target folder
+ ZEN_CONSOLE("Wiping {}", Path);
+ CleanDirectory(Path, DefaultExcludeFolders);
+ }
+ else
+ {
+ // Remove unused tracked files
+ tsl::robin_map<std::string, uint32_t> RemotePathToRemoteIndex;
+ RemotePathToRemoteIndex.reserve(RemoteContent.Paths.size());
+ for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++)
+ {
+ RemotePathToRemoteIndex.insert({RemoteContent.Paths[RemotePathIndex].generic_string(), RemotePathIndex});
+ }
+ std::vector<std::filesystem::path> LocalFilesToRemove;
+ for (uint32_t LocalPathIndex = 0; LocalPathIndex < LocalContent.Paths.size(); LocalPathIndex++)
{
- if (auto It = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != RemoteLookup.ChunkHashToChunkIndex.end())
+ if (!RemotePathToRemoteIndex.contains(LocalContent.Paths[LocalPathIndex].generic_string()))
{
- const uint32_t RemoteChunkIndex = It->second;
- if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex])
+ const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
+ if (std::filesystem::exists(LocalFilePath))
{
- return true;
+ LocalFilesToRemove.emplace_back(std::move(LocalFilePath));
}
}
}
- return false;
- };
-
- for (size_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++)
- {
- if (Work.IsAborted())
+ if (!LocalFilesToRemove.empty())
{
- break;
+ ZEN_CONSOLE("Cleaning {} removed files from {}", LocalFilesToRemove.size(), Path);
+ for (const std::filesystem::path& LocalFilePath : LocalFilesToRemove)
+ {
+ SetFileReadOnly(LocalFilePath, false);
+ std::filesystem::remove(LocalFilePath);
+ }
}
- Work.ScheduleWork(
- WritePool,
- [&, BlockIndex](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- if (IsBlockNeeded(BlockDescriptions[BlockIndex]))
- {
- Work.ScheduleWork(
- NetworkPool,
- [&, BlockIndex](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescriptions[BlockIndex].BlockHash);
- if (!BlockBuffer)
- {
- throw std::runtime_error(
- fmt::format("Block {} is missing", BlockDescriptions[BlockIndex].BlockHash));
- }
- BytesDownloaded += BlockBuffer.GetSize();
- BlockBytes += BlockBuffer.GetSize();
- DownloadedBlocks++;
+ }
- if (!AbortFlag)
- {
- Work.ScheduleWork(
- WritePool,
- [&, BlockIndex, BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- IoHash BlockRawHash;
- uint64_t BlockRawSize;
- CompressedBuffer CompressedBlockBuffer =
- CompressedBuffer::FromCompressed(SharedBuffer(std::move(BlockBuffer)),
- BlockRawHash,
- BlockRawSize);
- if (!CompressedBlockBuffer)
- {
- throw std::runtime_error(fmt::format("Block {} is not a compressed buffer",
- BlockDescriptions[BlockIndex].BlockHash));
- }
+ {
+ WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
- if (BlockRawHash != BlockDescriptions[BlockIndex].BlockHash)
- {
- throw std::runtime_error(
- fmt::format("Block {} header has a mismatching raw hash {}",
- BlockDescriptions[BlockIndex].BlockHash,
- BlockRawHash));
- }
+ ProgressBar RebuildProgressBar(UsePlainProgress);
+ ParallellWork Work(AbortFlag);
- CompositeBuffer DecompressedBlockBuffer =
- CompressedBlockBuffer.DecompressToComposite();
- if (!DecompressedBlockBuffer)
- {
- throw std::runtime_error(fmt::format("Block {} failed to decompress",
- BlockDescriptions[BlockIndex].BlockHash));
- }
+ OutLocalFolderState.Paths.resize(RemoteContent.Paths.size());
+ OutLocalFolderState.RawSizes.resize(RemoteContent.Paths.size());
+ OutLocalFolderState.Attributes.resize(RemoteContent.Paths.size());
+ OutLocalFolderState.ModificationTicks.resize(RemoteContent.Paths.size());
- ZEN_ASSERT_SLOW(BlockDescriptions[BlockIndex].BlockHash ==
- IoHash::HashBuffer(DecompressedBlockBuffer));
+ std::atomic<uint64_t> TargetsComplete = 0;
- uint64_t BytesWrittenToDisk = 0;
- uint32_t ChunksReadFromBlock = 0;
- if (WriteBlockToDisk(Path,
- RemoteContent,
- RemotePathIndexWantsCopyFromCacheFlags,
- DecompressedBlockBuffer,
- RemoteLookup,
- RemoteChunkIndexNeedsCopyFromSourceFlags.data(),
- ChunksReadFromBlock,
- BytesWrittenToDisk))
- {
- BytesWritten += BytesWrittenToDisk;
- WriteToDiskBytes += BytesWrittenToDisk;
- ChunkCountWritten += ChunksReadFromBlock;
- }
- else
- {
- throw std::runtime_error(fmt::format("Block {} is malformed",
- BlockDescriptions[BlockIndex].BlockHash));
- }
- BlocksComplete++;
- }
- },
- [&, BlockIndex](const std::exception& Ex, std::atomic<bool>&) {
- ZEN_ERROR("Failed writing block {}. Reason: {}",
- BlockDescriptions[BlockIndex].BlockHash,
- Ex.what());
- AbortFlag = true;
- });
- }
- }
- },
- Work.DefaultErrorFunction());
- }
- else
- {
- ZEN_DEBUG("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash);
- BlocksComplete++;
- }
- }
- },
- Work.DefaultErrorFunction());
- }
- for (uint32_t PathIndex = 0; PathIndex < RemoteContent.Paths.size(); PathIndex++)
- {
- if (Work.IsAborted())
- {
- break;
- }
- if (RemoteContent.RawSizes[PathIndex] == 0)
+ size_t TargetOffset = 0;
+ while (TargetOffset < Targets.size())
{
+ if (AbortFlag)
+ {
+ break;
+ }
+
+ size_t TargetCount = 1;
+ const IoHash& RawHash = Targets[TargetOffset].first;
+ while (Targets[TargetOffset + TargetCount].first == RawHash)
+ {
+ TargetCount++;
+ }
+
Work.ScheduleWork(
- WritePool,
- [&, PathIndex](std::atomic<bool>&) {
+ WritePool, // GetSyncWorkerPool(),//
+ [&, BaseTargetOffset = TargetOffset, TargetCount](std::atomic<bool>&) {
if (!AbortFlag)
{
- const std::filesystem::path TargetPath = (Path / RemoteContent.Paths[PathIndex]).make_preferred();
- CreateDirectories(TargetPath.parent_path());
- BasicFile OutputFile;
- OutputFile.Open(TargetPath, BasicFile::Mode::kTruncate);
+ size_t TargetOffset = BaseTargetOffset;
+ const IoHash& RawHash = Targets[TargetOffset].first;
+ const uint32_t FirstTargetPathIndex = Targets[TargetOffset].second;
+ const std::filesystem::path& FirstTargetPath = RemoteContent.Paths[FirstTargetPathIndex];
+ OutLocalFolderState.Paths[FirstTargetPathIndex] = FirstTargetPath;
+ OutLocalFolderState.RawSizes[FirstTargetPathIndex] = RemoteContent.RawSizes[FirstTargetPathIndex];
+ const std::filesystem::path FirstTargetFilePath = (Path / FirstTargetPath).make_preferred();
+ if (RawHash == IoHash::Zero)
+ {
+ if (std::filesystem::exists(FirstTargetFilePath))
+ {
+ SetFileReadOnly(FirstTargetFilePath, false);
+ }
+ CreateDirectories(FirstTargetFilePath.parent_path());
+ {
+ BasicFile OutputFile;
+ OutputFile.Open(FirstTargetFilePath, BasicFile::Mode::kTruncate);
+ }
+ }
+ else
+ {
+ const std::filesystem::path CacheFilePath = (CacheFolderPath / RawHash.ToHexString()).make_preferred();
+ ZEN_ASSERT_SLOW(std::filesystem::exists(CacheFilePath));
+ CreateDirectories(FirstTargetFilePath.parent_path());
+ if (std::filesystem::exists(FirstTargetFilePath))
+ {
+ SetFileReadOnly(FirstTargetFilePath, false);
+ }
+ std::filesystem::rename(CacheFilePath, FirstTargetFilePath);
+ }
+
+ OutLocalFolderState.Attributes[FirstTargetPathIndex] =
+ RemoteContent.Attributes.empty() ? GetNativeFileAttributes(FirstTargetFilePath)
+ : SetNativeFileAttributes(FirstTargetFilePath,
+ RemoteContent.Platform,
+ RemoteContent.Attributes[FirstTargetPathIndex]);
+ OutLocalFolderState.ModificationTicks[FirstTargetPathIndex] = GetModificationTickFromPath(FirstTargetFilePath);
+
+ TargetOffset++;
+ TargetsComplete++;
+ while (TargetOffset < (BaseTargetOffset + TargetCount))
+ {
+ ZEN_ASSERT(Targets[TargetOffset].first == RawHash);
+ ZEN_ASSERT_SLOW(std::filesystem::exists(FirstTargetFilePath));
+ const uint32_t ExtraTargetPathIndex = Targets[TargetOffset].second;
+ const std::filesystem::path& ExtraTargetPath = RemoteContent.Paths[ExtraTargetPathIndex];
+ const std::filesystem::path ExtraTargetFilePath = (Path / ExtraTargetPath).make_preferred();
+ OutLocalFolderState.Paths[ExtraTargetPathIndex] = ExtraTargetPath;
+ OutLocalFolderState.RawSizes[ExtraTargetPathIndex] = RemoteContent.RawSizes[ExtraTargetPathIndex];
+ CreateDirectories(ExtraTargetFilePath.parent_path());
+ if (std::filesystem::exists(ExtraTargetFilePath))
+ {
+ SetFileReadOnly(ExtraTargetFilePath, false);
+ }
+ CopyFile(FirstTargetFilePath, ExtraTargetFilePath, {.EnableClone = false});
+
+ OutLocalFolderState.Attributes[ExtraTargetPathIndex] =
+ RemoteContent.Attributes.empty()
+ ? GetNativeFileAttributes(ExtraTargetFilePath)
+ : SetNativeFileAttributes(ExtraTargetFilePath,
+ RemoteContent.Platform,
+ RemoteContent.Attributes[ExtraTargetPathIndex]);
+ OutLocalFolderState.ModificationTicks[ExtraTargetPathIndex] =
+ GetModificationTickFromPath(ExtraTargetFilePath);
+
+ TargetOffset++;
+ TargetsComplete++;
+ }
}
},
Work.DefaultErrorFunction());
- }
- }
- Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
- ZEN_ASSERT(ChunkCountToWrite >= ChunkCountWritten.load());
- WriteProgressBar.UpdateState(
- {.Task = "Writing chunks ",
- .Details = fmt::format("Written {} chunks out of {}. {} ouf of {} blocks complete. Downloaded: {}. Written: {}",
- ChunkCountWritten.load(),
- ChunkCountToWrite,
- BlocksComplete.load(),
- BlockCount,
- NiceBytes(BytesDownloaded.load()),
- NiceBytes(BytesWritten.load())),
- .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite),
- .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - ChunkCountWritten.load())},
- false);
- });
-
- if (AbortFlag)
- {
- return;
- }
+ TargetOffset += TargetCount;
+ }
- WriteProgressBar.Finish();
+ Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(IsAborted, PendingWork);
+ RebuildProgressBar.UpdateState(
+ {.Task = "Rebuilding state ",
+ .Details = fmt::format("Written {} files out of {}", TargetsComplete.load(), Targets.size()),
+ .TotalCount = gsl::narrow<uint64_t>(Targets.size()),
+ .RemainingCount = gsl::narrow<uint64_t>(Targets.size() - TargetsComplete.load())},
+ false);
+ });
- {
- ProgressBar PremissionsProgressBar(false);
- if (!RemoteContent.Attributes.empty())
+ if (AbortFlag)
{
- auto SetNativeFileAttributes =
- [](const std::filesystem::path FilePath, SourcePlatform SourcePlatform, uint32_t Attributes) -> uint32_t {
-#if ZEN_PLATFORM_WINDOWS
- if (SourcePlatform == SourcePlatform::Windows)
- {
- SetFileAttributes(FilePath, Attributes);
- return Attributes;
- }
- else
- {
- uint32_t CurrentAttributes = GetFileAttributes(FilePath);
- uint32_t NewAttributes = MakeFileAttributeReadOnly(CurrentAttributes, IsFileModeReadOnly(Attributes));
- if (CurrentAttributes != NewAttributes)
- {
- SetFileAttributes(FilePath, NewAttributes);
- }
- return NewAttributes;
- }
-#endif // ZEN_PLATFORM_WINDOWS
-#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
- if (SourcePlatform != SourcePlatform::Windows)
- {
- SetFileMode(FilePath, Attributes);
- return Attributes;
- }
- else
- {
- uint32_t CurrentMode = GetFileMode(FilePath);
- uint32_t NewMode = MakeFileModeReadOnly(CurrentMode, IsFileAttributeReadOnly(Attributes));
- if (CurrentMode != NewMode)
- {
- SetFileMode(FilePath, NewMode);
- }
- return NewMode;
- }
-#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
- };
-
- OutLocalFolderState.Paths.reserve(RemoteContent.Paths.size());
- OutLocalFolderState.RawSizes.reserve(RemoteContent.Paths.size());
- OutLocalFolderState.Attributes.reserve(RemoteContent.Paths.size());
- OutLocalFolderState.ModificationTicks.reserve(RemoteContent.Paths.size());
- for (uint32_t PathIndex = 0; PathIndex < RemoteContent.Paths.size(); PathIndex++)
- {
- const std::filesystem::path LocalFilePath = (Path / RemoteContent.Paths[PathIndex]);
- const uint32_t CurrentPlatformAttributes =
- SetNativeFileAttributes(LocalFilePath, RemoteContent.Platform, RemoteContent.Attributes[PathIndex]);
-
- OutLocalFolderState.Paths.push_back(RemoteContent.Paths[PathIndex]);
- OutLocalFolderState.RawSizes.push_back(RemoteContent.RawSizes[PathIndex]);
- OutLocalFolderState.Attributes.push_back(CurrentPlatformAttributes);
- OutLocalFolderState.ModificationTicks.push_back(GetModificationTickFromPath(LocalFilePath));
-
- PremissionsProgressBar.UpdateState(
- {.Task = "Set permissions ",
- .Details = fmt::format("Updated {} files out of {}", PathIndex, RemoteContent.Paths.size()),
- .TotalCount = RemoteContent.Paths.size(),
- .RemainingCount = (RemoteContent.Paths.size() - PathIndex)},
- false);
- }
+ return;
}
- PremissionsProgressBar.Finish();
+
+ RebuildProgressBar.Finish();
}
}
@@ -4077,7 +4076,7 @@ namespace {
CbObject BuildObject = Storage.GetBuild(BuildId);
ZEN_CONSOLE("GetBuild took {}. Payload size: {}",
- NiceLatencyNs(GetBuildTimer.GetElapsedTimeUs() * 1000),
+ NiceTimeSpanMs(GetBuildTimer.GetElapsedTimeMs()),
NiceBytes(BuildObject.GetSize()));
CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView();
@@ -4162,7 +4161,7 @@ namespace {
ZEN_CONSOLE("GetBuildPart {} ('{}') took {}. Payload size: {}",
BuildParts[0].first,
BuildParts[0].second,
- NiceLatencyNs(GetBuildPartTimer.GetElapsedTimeUs() * 1000),
+ NiceTimeSpanMs(GetBuildPartTimer.GetElapsedTimeMs()),
NiceBytes(BuildPartManifest.GetSize()));
{
@@ -4202,7 +4201,7 @@ namespace {
OutBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockRawHashes);
ZEN_CONSOLE("GetBlockMetadata for {} took {}. Found {} blocks",
BuildPartId,
- NiceLatencyNs(GetBlockMetadataTimer.GetElapsedTimeUs() * 1000),
+ NiceTimeSpanMs(GetBlockMetadataTimer.GetElapsedTimeMs()),
OutBlockDescriptions.size());
if (OutBlockDescriptions.size() != BlockRawHashes.size())
@@ -4309,7 +4308,7 @@ namespace {
ZEN_CONSOLE("GetBuildPart {} ('{}') took {}. Payload size: {}",
OverlayBuildPartId,
OverlayBuildPartName,
- NiceLatencyNs(GetOverlayBuildPartTimer.GetElapsedTimeUs() * 1000),
+ NiceTimeSpanMs(GetOverlayBuildPartTimer.GetElapsedTimeMs()),
NiceBytes(OverlayBuildPartManifest.GetSize()));
ChunkedFolderContent OverlayPartContent;
@@ -4457,6 +4456,7 @@ namespace {
if (!LocalFolderState.AreKnownFilesEqual(CurrentLocalFolderContent))
{
+ const size_t LocaStatePathCount = LocalFolderState.Paths.size();
std::vector<std::filesystem::path> DeletedPaths;
FolderContent UpdatedContent = GetUpdatedContent(LocalFolderState, CurrentLocalFolderContent, DeletedPaths);
if (!DeletedPaths.empty())
@@ -4464,9 +4464,10 @@ namespace {
LocalContent = DeletePathsFromChunkedContent(LocalContent, DeletedPaths);
}
- ZEN_CONSOLE("Updating state, {} local files deleted and {} local files updated",
+ ZEN_CONSOLE("Updating state, {} local files deleted and {} local files updated out of {}",
DeletedPaths.size(),
- UpdatedContent.Paths.size());
+ UpdatedContent.Paths.size(),
+ LocaStatePathCount);
if (UpdatedContent.Paths.size() > 0)
{
uint64_t ByteCountToScan = 0;
@@ -4535,7 +4536,7 @@ namespace {
ZEN_CONSOLE("Using cached local state");
}
- ZEN_CONSOLE("Read local state in {}", NiceLatencyNs(ReadStateTimer.GetElapsedTimeUs() * 1000));
+ ZEN_CONSOLE("Read local state in {}", NiceTimeSpanMs(ReadStateTimer.GetElapsedTimeMs()));
ScanContent = false;
}
}
@@ -4610,7 +4611,7 @@ namespace {
});
CreateDirectories(Path / ZenTempBlockFolderName);
CreateDirectories(Path / ZenTempChunkFolderName);
- CreateDirectories(Path / ZenTempReuseFolderName);
+ CreateDirectories(Path / ZenTempCacheFolderName);
std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u;
@@ -4698,7 +4699,7 @@ namespace {
if (CompareContent(RemoteContent, LocalContent))
{
ZEN_CONSOLE("Local state is identical to build to download. All done. Completed in {}.",
- NiceLatencyNs(DownloadTimer.GetElapsedTimeUs() * 1000));
+ NiceTimeSpanMs(DownloadTimer.GetElapsedTimeMs()));
}
else
{
@@ -4730,7 +4731,7 @@ namespace {
CreateDirectories((Path / ZenStateFilePath).parent_path());
TemporaryFile::SafeWriteFile(Path / ZenStateFilePath, StateObject.GetView());
- ZEN_CONSOLE("Wrote local state in {}", NiceLatencyNs(WriteStateTimer.GetElapsedTimeUs() * 1000));
+ ZEN_CONSOLE("Wrote local state in {}", NiceTimeSpanMs(WriteStateTimer.GetElapsedTimeMs()));
#if 0
ExtendableStringBuilder<1024> SB;
@@ -4738,7 +4739,7 @@ namespace {
WriteFile(Path / ZenStateFileJsonPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size()));
#endif // 0
- ZEN_CONSOLE("Downloaded build in {}.", NiceLatencyNs(DownloadTimer.GetElapsedTimeUs() * 1000));
+ ZEN_CONSOLE("Downloaded build in {}.", NiceTimeSpanMs(DownloadTimer.GetElapsedTimeMs()));
}
}
}
@@ -5095,13 +5096,13 @@ BuildsCommand::BuildsCommand()
"Build part Ids list separated by ',', if no build-part-ids or build-part-names are given all parts will be downloaded",
cxxopts::value(m_BuildPartIds),
"<id>");
- m_DownloadOptions.add_option(
- "",
- "",
- "build-part-name",
- "Name of the build parts list separated by ',', if no build-part-ids or build-part-names are given all parts will be downloaded",
- cxxopts::value(m_BuildPartNames),
- "<name>");
+ m_DownloadOptions.add_option("",
+ "",
+ "build-part-name",
+ "Name of the build parts list separated by ',', if no build-part-ids or build-part-names are given "
+ "all parts will be downloaded",
+ cxxopts::value(m_BuildPartNames),
+ "<name>");
m_DownloadOptions
.add_option("", "", "clean", "Delete all data in target folder before downloading", cxxopts::value(m_Clean), "<clean>");
m_DownloadOptions.add_option("",
@@ -5841,7 +5842,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
ZEN_CONSOLE("Scrambling files, {} remaining", PendingWork);
});
ZEN_ASSERT(!AbortFlag.load());
- ZEN_CONSOLE("Scrambled files in {}", NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ ZEN_CONSOLE("Scrambled files in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
};
ScrambleDir(DownloadPath);
diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp
index 8279fb952..85feab2f7 100644
--- a/src/zencore/filesystem.cpp
+++ b/src/zencore/filesystem.cpp
@@ -531,7 +531,10 @@ CloneFile(std::filesystem::path FromPath, std::filesystem::path ToPath)
}
void
-CopyFile(std::filesystem::path FromPath, std::filesystem::path ToPath, const CopyFileOptions& Options, std::error_code& OutErrorCode)
+CopyFile(const std::filesystem::path& FromPath,
+ const std::filesystem::path& ToPath,
+ const CopyFileOptions& Options,
+ std::error_code& OutErrorCode)
{
OutErrorCode.clear();
@@ -544,7 +547,7 @@ CopyFile(std::filesystem::path FromPath, std::filesystem::path ToPath, const Cop
}
bool
-CopyFile(std::filesystem::path FromPath, std::filesystem::path ToPath, const CopyFileOptions& Options)
+CopyFile(const std::filesystem::path& FromPath, const std::filesystem::path& ToPath, const CopyFileOptions& Options)
{
bool Success = false;
diff --git a/src/zencore/include/zencore/filesystem.h b/src/zencore/include/zencore/filesystem.h
index 20f6dc56c..e020668fc 100644
--- a/src/zencore/include/zencore/filesystem.h
+++ b/src/zencore/include/zencore/filesystem.h
@@ -97,11 +97,11 @@ struct CopyFileOptions
bool MustClone = false;
};
-ZENCORE_API bool CopyFile(std::filesystem::path FromPath, std::filesystem::path ToPath, const CopyFileOptions& Options);
-ZENCORE_API void CopyFile(std::filesystem::path FromPath,
- std::filesystem::path ToPath,
- const CopyFileOptions& Options,
- std::error_code& OutError);
+ZENCORE_API bool CopyFile(const std::filesystem::path& FromPath, const std::filesystem::path& ToPath, const CopyFileOptions& Options);
+ZENCORE_API void CopyFile(const std::filesystem::path& FromPath,
+ const std::filesystem::path& ToPath,
+ const CopyFileOptions& Options,
+ std::error_code& OutError);
ZENCORE_API void CopyTree(std::filesystem::path FromPath, std::filesystem::path ToPath, const CopyFileOptions& Options);
ZENCORE_API bool SupportsBlockRefCounting(std::filesystem::path Path);
diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp
index 78ebcdd55..a4bb759e7 100644
--- a/src/zenutil/filebuildstorage.cpp
+++ b/src/zenutil/filebuildstorage.cpp
@@ -336,7 +336,8 @@ public:
const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
if (std::filesystem::is_regular_file(BlockPath))
{
- IoBuffer Payload = ReadFile(BlockPath).Flatten();
+ BasicFile File(BlockPath, BasicFile::Mode::kRead);
+ IoBuffer Payload = File.ReadAll();
ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(SharedBuffer(Payload))));
m_Stats.TotalBytesRead += Payload.GetSize();
Payload.SetContentType(ZenContentType::kCompressedBinary);
@@ -365,13 +366,13 @@ public:
struct WorkloadData
{
std::atomic<uint64_t> BytesRemaining;
- IoBuffer BlobFile;
+ BasicFile BlobFile;
std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)> Receiver;
};
std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
- Workload->BlobFile = IoBufferBuilder::MakeFromFile(BlockPath);
- const uint64_t BlobSize = Workload->BlobFile.GetSize();
+ Workload->BlobFile.Open(BlockPath, BasicFile::Mode::kRead);
+ const uint64_t BlobSize = Workload->BlobFile.FileSize();
Workload->Receiver = std::move(Receiver);
Workload->BytesRemaining = BlobSize;
@@ -383,7 +384,8 @@ public:
uint64_t Size = Min(ChunkSize, BlobSize - Offset);
WorkItems.push_back([this, BlockPath, Workload, Offset, Size]() {
SimulateLatency(0, 0);
- IoBuffer PartPayload(Workload->BlobFile, Offset, Size);
+ IoBuffer PartPayload(Size);
+ Workload->BlobFile.Read(PartPayload.GetMutableView().GetData(), Size, Offset);
m_Stats.TotalBytesRead += PartPayload.GetSize();
uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Size);
Workload->Receiver(Offset, PartPayload, ByteRemaning);