aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGELOG.md8
-rw-r--r--src/zen/cmds/admin_cmd.cpp19
-rw-r--r--src/zen/cmds/admin_cmd.h8
-rw-r--r--src/zen/cmds/builds_cmd.cpp1765
-rw-r--r--src/zen/cmds/builds_cmd.h38
-rw-r--r--src/zen/cmds/status_cmd.cpp9
-rw-r--r--src/zen/cmds/status_cmd.h6
-rw-r--r--src/zen/cmds/up_cmd.cpp43
-rw-r--r--src/zen/cmds/up_cmd.h28
-rw-r--r--src/zen/cmds/workspaces_cmd.cpp91
-rw-r--r--src/zen/cmds/workspaces_cmd.h34
-rw-r--r--src/zen/zen.cpp18
-rw-r--r--src/zencore/basicfile.cpp6
-rw-r--r--src/zencore/compress.cpp130
-rw-r--r--src/zencore/filesystem.cpp13
-rw-r--r--src/zencore/include/zencore/compress.h18
-rw-r--r--src/zencore/include/zencore/filesystem.h2
-rw-r--r--src/zencore/include/zencore/process.h3
-rw-r--r--src/zencore/process.cpp142
-rw-r--r--src/zenutil/include/zenutil/parallellwork.h16
20 files changed, 1506 insertions, 891 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index febd32eb4..8ff45befc 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,9 +1,10 @@
##
- Bugfix: GetModificationTickFromPath and CopyFile now works correctly on Windows/Mac
+- Bugfix: Handling of quotes and quotes with leading backslash for command line parsing - UE-231677
- Improvement: When logging with a epoch time prefix, the milliseconds/fraction is now correct. We now also set the epoch to the process spawn time rather than the time when the logger is created
- **EXPERIMENTAL** `zen builds`
- Improvement: Do partial requests of blocks if not all of the block is needed
- - Improvement: Better progress/statistics on download
+ - Improvement: Better progress/statistics on upload and download
- Improvement: Scavenge .zen temp folders for existing data (downloaded, decompressed or written) from previous failed run
- Improvement: Faster abort during stream compression
- Improvement: Try to move downloaded blobs with rename if possible avoiding an extra disk write
@@ -17,6 +18,11 @@
- Improvement: More trace scopes for build upload operations
- Improvement: Progress bar automatically switches to plain mode when stdout is not a console
- Improvement: Progress bar is much more efficient on Windows (switched away from printf)
+ - Improvement: Improved stats output at end of upload and download operations
+ - Improvement: Reduced disk I/O when writing out chunk blocks during download
+ - Improvement: Collapse consecutive ranges when reading/writing data from local cache state
+ - Improvement: If a chunk or block write operation results in more than one completed chunk sequence, do the additional verifications as async work
+ - Improvement: Improved error reporting when async tasks fail
- Bugfix: Ensure that temporary folder for Jupiter downloads exists during verify phase
- Bugfix: Fixed crash during download when trying to write outside a file range
- Bugfix: MacOS / Linux zen build download now works correctly
diff --git a/src/zen/cmds/admin_cmd.cpp b/src/zen/cmds/admin_cmd.cpp
index 995ed4136..835e01151 100644
--- a/src/zen/cmds/admin_cmd.cpp
+++ b/src/zen/cmds/admin_cmd.cpp
@@ -714,26 +714,29 @@ CopyStateCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw OptionParseException("data path must be given");
}
- if (!std::filesystem::is_directory(m_DataPath))
+ std::filesystem::path DataPath = StringToPath(m_DataPath);
+ std::filesystem::path TargetPath = StringToPath(m_TargetPath);
+
+ if (!std::filesystem::is_directory(DataPath))
{
throw OptionParseException("data path must exist");
}
- if (m_TargetPath.empty())
+ if (TargetPath.empty())
{
throw OptionParseException("target path must be given");
}
- std::filesystem::path RootManifestPath = m_DataPath / "root_manifest";
- std::filesystem::path TargetRootManifestPath = m_TargetPath / "root_manifest";
+ std::filesystem::path RootManifestPath = DataPath / "root_manifest";
+ std::filesystem::path TargetRootManifestPath = TargetPath / "root_manifest";
if (!TryCopy(RootManifestPath, TargetRootManifestPath))
{
throw OptionParseException("data path is invalid, missing root_manifest");
}
- std::filesystem::path CachePath = m_DataPath / "cache";
- std::filesystem::path TargetCachePath = m_TargetPath / "cache";
+ std::filesystem::path CachePath = DataPath / "cache";
+ std::filesystem::path TargetCachePath = TargetPath / "cache";
// Copy cache state
DirectoryContent CacheDirectoryContent;
@@ -778,8 +781,8 @@ CopyStateCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
}
- std::filesystem::path CasPath = m_DataPath / "cas";
- std::filesystem::path TargetCasPath = m_TargetPath / "cas";
+ std::filesystem::path CasPath = DataPath / "cas";
+ std::filesystem::path TargetCasPath = TargetPath / "cas";
{
std::filesystem::path UCasRootPath = CasPath / ".ucas_root";
diff --git a/src/zen/cmds/admin_cmd.h b/src/zen/cmds/admin_cmd.h
index c593b2cac..8b6d3e258 100644
--- a/src/zen/cmds/admin_cmd.h
+++ b/src/zen/cmds/admin_cmd.h
@@ -155,10 +155,10 @@ public:
virtual cxxopts::Options& Options() override { return m_Options; }
private:
- cxxopts::Options m_Options{"copy-state", "Copy zen server disk state"};
- std::filesystem::path m_DataPath;
- std::filesystem::path m_TargetPath;
- bool m_SkipLogs = false;
+ cxxopts::Options m_Options{"copy-state", "Copy zen server disk state"};
+ std::string m_DataPath;
+ std::string m_TargetPath;
+ bool m_SkipLogs = false;
};
} // namespace zen
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index 4b7c7fa8a..5ed3642d8 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -569,6 +569,64 @@ namespace {
}
};
+ struct CacheMappingStatistics
+ {
+ uint64_t CacheChunkCount = 0;
+ uint64_t CacheChunkByteCount = 0;
+
+ uint64_t CacheBlockCount = 0;
+ uint64_t CacheBlocksByteCount = 0;
+
+ uint64_t CacheSequenceHashesCount = 0;
+ uint64_t CacheSequenceHashesByteCount = 0;
+
+ uint32_t LocalPathsMatchingSequencesCount = 0;
+ uint64_t LocalPathsMatchingSequencesByteCount = 0;
+
+ uint64_t LocalChunkMatchingRemoteCount = 0;
+ uint64_t LocalChunkMatchingRemoteByteCount = 0;
+ };
+
+ struct DownloadStatistics
+ {
+ std::atomic<uint64_t> RequestsCompleteCount = 0;
+
+ std::atomic<uint64_t> DownloadedChunkCount = 0;
+ std::atomic<uint64_t> DownloadedChunkByteCount = 0;
+ std::atomic<uint64_t> MultipartAttachmentCount = 0;
+
+ std::atomic<uint64_t> DownloadedBlockCount = 0;
+ std::atomic<uint64_t> DownloadedBlockByteCount = 0;
+
+ std::atomic<uint64_t> DownloadedPartialBlockCount = 0;
+ std::atomic<uint64_t> DownloadedPartialBlockByteCount = 0;
+ };
+
+ struct WriteChunkStatistics
+ {
+ std::atomic<uint32_t> ChunkCountWritten = 0;
+ std::atomic<uint64_t> ChunkBytesWritten = 0;
+ uint64_t DownloadTimeUs = 0;
+ uint64_t WriteTimeUs = 0;
+ uint64_t WriteChunksElapsedWallTimeUs = 0;
+ };
+
+ struct RebuildFolderStateStatistics
+ {
+ uint64_t CleanFolderElapsedWallTimeUs = 0;
+ std::atomic<uint32_t> FinalizeTreeFilesMovedCount = 0;
+ std::atomic<uint32_t> FinalizeTreeFilesCopiedCount = 0;
+ uint64_t FinalizeTreeElapsedWallTimeUs = 0;
+ };
+
+ struct VerifyFolderStatistics
+ {
+ std::atomic<uint64_t> FilesVerified = 0;
+ std::atomic<uint64_t> FilesFailed = 0;
+ std::atomic<uint64_t> ReadBytes = 0;
+ uint64_t VerifyElapsedWallTimeUs = 0;
+ };
+
std::vector<uint32_t> CalculateAbsoluteChunkOrders(const std::span<const IoHash> LocalChunkHashes,
const std::span<const uint32_t> LocalChunkOrder,
const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& ChunkHashToLocalChunkIndex,
@@ -1035,7 +1093,15 @@ namespace {
class BufferedOpenFile
{
public:
- BufferedOpenFile(const std::filesystem::path Path) : Source(Path, BasicFile::Mode::kRead), SourceSize(Source.FileSize()) {}
+ BufferedOpenFile(const std::filesystem::path Path, DiskStatistics& DiskStats)
+ : m_Source(Path, BasicFile::Mode::kRead)
+ , m_SourceSize(m_Source.FileSize())
+ , m_DiskStats(DiskStats)
+ {
+ m_DiskStats.OpenReadCount++;
+ m_DiskStats.CurrentOpenFileCount++;
+ }
+ ~BufferedOpenFile() { m_DiskStats.CurrentOpenFileCount--; }
BufferedOpenFile() = delete;
BufferedOpenFile(const BufferedOpenFile&) = delete;
BufferedOpenFile(BufferedOpenFile&&) = delete;
@@ -1047,10 +1113,10 @@ namespace {
{
ZEN_TRACE_CPU("BufferedOpenFile::GetRange");
- ZEN_ASSERT((CacheBlockIndex == (uint64_t)-1) || Cache);
- auto _ = MakeGuard([&]() { ZEN_ASSERT((CacheBlockIndex == (uint64_t)-1) || Cache); });
+ ZEN_ASSERT((m_CacheBlockIndex == (uint64_t)-1) || m_Cache);
+ auto _ = MakeGuard([&]() { ZEN_ASSERT((m_CacheBlockIndex == (uint64_t)-1) || m_Cache); });
- ZEN_ASSERT((Offset + Size) <= SourceSize);
+ ZEN_ASSERT((Offset + Size) <= m_SourceSize);
const uint64_t BlockIndexStart = Offset / BlockSize;
const uint64_t BlockIndexEnd = (Offset + Size - 1) / BlockSize;
@@ -1061,21 +1127,23 @@ namespace {
for (uint64_t BlockIndex = BlockIndexStart; BlockIndex <= BlockIndexEnd; BlockIndex++)
{
const uint64_t BlockStartOffset = BlockIndex * BlockSize;
- if (CacheBlockIndex != BlockIndex)
+ if (m_CacheBlockIndex != BlockIndex)
{
- uint64_t CacheSize = Min(BlockSize, SourceSize - BlockStartOffset);
+ uint64_t CacheSize = Min(BlockSize, m_SourceSize - BlockStartOffset);
ZEN_ASSERT(CacheSize > 0);
- Cache = IoBuffer(CacheSize);
- Source.Read(Cache.GetMutableView().GetData(), CacheSize, BlockStartOffset);
- CacheBlockIndex = BlockIndex;
+ m_Cache = IoBuffer(CacheSize);
+ m_Source.Read(m_Cache.GetMutableView().GetData(), CacheSize, BlockStartOffset);
+ m_DiskStats.ReadCount++;
+ m_DiskStats.ReadByteCount += CacheSize;
+ m_CacheBlockIndex = BlockIndex;
}
const uint64_t BytesRead = ReadOffset - Offset;
ZEN_ASSERT(BlockStartOffset <= ReadOffset);
const uint64_t OffsetIntoBlock = ReadOffset - BlockStartOffset;
- ZEN_ASSERT(OffsetIntoBlock < Cache.GetSize());
- const uint64_t BlockBytes = Min(Cache.GetSize() - OffsetIntoBlock, Size - BytesRead);
- BufferRanges.emplace_back(SharedBuffer(IoBuffer(Cache, OffsetIntoBlock, BlockBytes)));
+ ZEN_ASSERT(OffsetIntoBlock < m_Cache.GetSize());
+ const uint64_t BlockBytes = Min(m_Cache.GetSize() - OffsetIntoBlock, Size - BytesRead);
+ BufferRanges.emplace_back(SharedBuffer(IoBuffer(m_Cache, OffsetIntoBlock, BlockBytes)));
ReadOffset += BlockBytes;
}
CompositeBuffer Result(std::move(BufferRanges));
@@ -1084,10 +1152,11 @@ namespace {
}
private:
- BasicFile Source;
- const uint64_t SourceSize;
- uint64_t CacheBlockIndex = (uint64_t)-1;
- IoBuffer Cache;
+ BasicFile m_Source;
+ const uint64_t m_SourceSize;
+ DiskStatistics& m_DiskStats;
+ uint64_t m_CacheBlockIndex = (uint64_t)-1;
+ IoBuffer m_Cache;
};
class ReadFileCache
@@ -1106,11 +1175,7 @@ namespace {
{
m_OpenFiles.reserve(MaxOpenFileCount);
}
- ~ReadFileCache()
- {
- m_DiskStats.CurrentOpenFileCount -= m_OpenFiles.size();
- m_OpenFiles.clear();
- }
+ ~ReadFileCache() { m_OpenFiles.clear(); }
CompositeBuffer GetRange(uint32_t SequenceIndex, uint64_t Offset, uint64_t Size)
{
@@ -1128,7 +1193,6 @@ namespace {
m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(SequenceIndex, std::move(CachedFile)));
}
CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size);
- m_DiskStats.ReadByteCount += Result.GetSize();
return Result;
}
const uint32_t LocalPathIndex = m_LocalLookup.SequenceIndexFirstPathIndex[SequenceIndex];
@@ -1136,20 +1200,15 @@ namespace {
if (Size == m_LocalContent.RawSizes[LocalPathIndex])
{
IoBuffer Result = IoBufferBuilder::MakeFromFile(LocalFilePath);
- m_DiskStats.OpenReadCount++;
- m_DiskStats.ReadByteCount += Result.GetSize();
return CompositeBuffer(SharedBuffer(Result));
}
if (m_OpenFiles.size() == m_OpenFiles.capacity())
{
m_OpenFiles.pop_back();
- m_DiskStats.CurrentOpenFileCount--;
}
- m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(SequenceIndex, std::make_unique<BufferedOpenFile>(LocalFilePath)));
+ m_OpenFiles.insert(m_OpenFiles.begin(),
+ std::make_pair(SequenceIndex, std::make_unique<BufferedOpenFile>(LocalFilePath, m_DiskStats)));
CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size);
- m_DiskStats.ReadByteCount += Result.GetSize();
- m_DiskStats.OpenReadCount++;
- m_DiskStats.CurrentOpenFileCount++;
return Result;
}
@@ -1186,17 +1245,21 @@ namespace {
}
IoHashStream Hash;
- bool CouldDecompress = Compressed.DecompressToStream(0, RawSize, [&Hash](uint64_t, const CompositeBuffer& RangeBuffer) {
- if (!AbortFlag)
- {
- for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ bool CouldDecompress = Compressed.DecompressToStream(
+ 0,
+ RawSize,
+ [&Hash](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset, SourceSize, Offset);
+ if (!AbortFlag)
{
- Hash.Append(Segment.GetView());
+ for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ {
+ Hash.Append(Segment.GetView());
+ }
+ return true;
}
- return true;
- }
- return false;
- });
+ return false;
+ });
if (AbortFlag)
{
@@ -1349,8 +1412,7 @@ namespace {
const std::uint64_t PreferredMultipartChunkSize,
ParallellWork& Work,
WorkerThreadPool& NetworkPool,
- std::atomic<uint64_t>& BytesDownloaded,
- std::atomic<uint64_t>& MultipartAttachmentCount,
+ DownloadStatistics& DownloadStats,
std::function<void(IoBuffer&& Payload)>&& OnDownloadComplete)
{
ZEN_TRACE_CPU("DownloadLargeBlob");
@@ -1372,10 +1434,10 @@ namespace {
BuildId,
ChunkHash,
PreferredMultipartChunkSize,
- [Workload, &BytesDownloaded, OnDownloadComplete = std::move(OnDownloadComplete)](uint64_t Offset,
- const IoBuffer& Chunk,
- uint64_t BytesRemaining) {
- BytesDownloaded += Chunk.GetSize();
+ [Workload, &DownloadStats, OnDownloadComplete = std::move(OnDownloadComplete)](uint64_t Offset,
+ const IoBuffer& Chunk,
+ uint64_t BytesRemaining) {
+ DownloadStats.DownloadedChunkByteCount += Chunk.GetSize();
if (!AbortFlag.load())
{
@@ -1383,6 +1445,7 @@ namespace {
Workload->TempFile.Write(Chunk.GetView(), Offset);
if (Chunk.GetSize() == BytesRemaining)
{
+ DownloadStats.DownloadedChunkCount++;
uint64_t PayloadSize = Workload->TempFile.FileSize();
void* FileHandle = Workload->TempFile.Detach();
ZEN_ASSERT(FileHandle != nullptr);
@@ -1394,7 +1457,7 @@ namespace {
});
if (!WorkItems.empty())
{
- MultipartAttachmentCount++;
+ DownloadStats.MultipartAttachmentCount++;
}
for (auto& WorkItem : WorkItems)
{
@@ -1411,7 +1474,23 @@ namespace {
}
}
- void ValidateBuildPart(BuildStorage& Storage, const Oid& BuildId, Oid BuildPartId, const std::string_view BuildPartName)
+ struct ValidateStatistics
+ {
+ uint64_t BuildBlobSize = 0;
+ uint64_t BuildPartSize = 0;
+ uint64_t ChunkAttachmentCount = 0;
+ uint64_t BlockAttachmentCount = 0;
+ std::atomic<uint64_t> VerifiedAttachmentCount = 0;
+ std::atomic<uint64_t> VerifiedByteCount = 0;
+ uint64_t ElapsedWallTimeUS = 0;
+ };
+
+ void ValidateBuildPart(BuildStorage& Storage,
+ const Oid& BuildId,
+ Oid BuildPartId,
+ const std::string_view BuildPartName,
+ ValidateStatistics& ValidateStats,
+ DownloadStatistics& DownloadStats)
{
Stopwatch Timer;
auto _ = MakeGuard([&]() {
@@ -1430,23 +1509,27 @@ namespace {
throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", BuildId, BuildPartName));
}
}
+ ValidateStats.BuildBlobSize = Build.GetSize();
uint64_t PreferredMultipartChunkSize = DefaultPreferredMultipartChunkSize;
if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0)
{
PreferredMultipartChunkSize = ChunkSize;
}
- CbObject BuildPart = Storage.GetBuildPart(BuildId, BuildPartId);
+ CbObject BuildPart = Storage.GetBuildPart(BuildId, BuildPartId);
+ ValidateStats.BuildPartSize = BuildPart.GetSize();
ZEN_CONSOLE("Validating build part {}/{} ({})", BuildId, BuildPartId, NiceBytes(BuildPart.GetSize()));
std::vector<IoHash> ChunkAttachments;
for (CbFieldView LooseFileView : BuildPart["chunkAttachments"sv].AsObjectView()["rawHashes"sv])
{
ChunkAttachments.push_back(LooseFileView.AsBinaryAttachment());
}
+ ValidateStats.ChunkAttachmentCount = ChunkAttachments.size();
std::vector<IoHash> BlockAttachments;
for (CbFieldView BlocksView : BuildPart["blockAttachments"sv].AsObjectView()["rawHashes"sv])
{
BlockAttachments.push_back(BlocksView.AsBinaryAttachment());
}
+ ValidateStats.BlockAttachmentCount = BlockAttachments.size();
std::vector<ChunkBlockDescription> VerifyBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockAttachments);
if (VerifyBlockDescriptions.size() != BlockAttachments.size())
@@ -1456,7 +1539,6 @@ namespace {
}
WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
- WorkerThreadPool& ReadPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
ParallellWork Work(AbortFlag);
@@ -1472,20 +1554,16 @@ namespace {
ProgressBar ProgressBar(UsePlainProgress);
- uint64_t AttachmentsToVerifyCount = ChunkAttachments.size() + BlockAttachments.size();
- std::atomic<uint64_t> DownloadedAttachmentCount = 0;
- std::atomic<uint64_t> VerifiedAttachmentCount = 0;
- std::atomic<uint64_t> DownloadedByteCount = 0;
- std::atomic<uint64_t> VerifiedByteCount = 0;
- FilteredRate FilteredDownloadedBytesPerSecond;
- FilteredRate FilteredVerifiedBytesPerSecond;
+ uint64_t AttachmentsToVerifyCount = ChunkAttachments.size() + BlockAttachments.size();
+ FilteredRate FilteredDownloadedBytesPerSecond;
+ FilteredRate FilteredVerifiedBytesPerSecond;
std::atomic<uint64_t> MultipartAttachmentCount = 0;
for (const IoHash& ChunkAttachment : ChunkAttachments)
{
Work.ScheduleWork(
- ReadPool,
+ NetworkPool,
[&, ChunkAttachment](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -1499,8 +1577,7 @@ namespace {
PreferredMultipartChunkSize,
Work,
NetworkPool,
- DownloadedByteCount,
- MultipartAttachmentCount,
+ DownloadStats,
[&, ChunkHash = ChunkAttachment](IoBuffer&& Payload) {
Payload.SetContentType(ZenContentType::kCompressedBinary);
if (!AbortFlag)
@@ -1512,6 +1589,12 @@ namespace {
{
ZEN_TRACE_CPU("ValidateBuildPart_Validate");
+ if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount ==
+ AttachmentsToVerifyCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+
FilteredVerifiedBytesPerSecond.Start();
uint64_t CompressedSize;
@@ -1521,9 +1604,9 @@ namespace {
ChunkHash,
NiceBytes(CompressedSize),
NiceBytes(DecompressedSize));
- VerifiedAttachmentCount++;
- VerifiedByteCount += DecompressedSize;
- if (VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
+ ValidateStats.VerifiedAttachmentCount++;
+ ValidateStats.VerifiedByteCount += DecompressedSize;
+ if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
{
FilteredVerifiedBytesPerSecond.Stop();
}
@@ -1548,9 +1631,9 @@ namespace {
FilteredDownloadedBytesPerSecond.Start();
IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlockAttachment);
- DownloadedAttachmentCount++;
- DownloadedByteCount += Payload.GetSize();
- if (DownloadedAttachmentCount.load() == AttachmentsToVerifyCount)
+ DownloadStats.DownloadedBlockCount++;
+ DownloadStats.DownloadedBlockByteCount += Payload.GetSize();
+ if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
@@ -1576,9 +1659,9 @@ namespace {
BlockAttachment,
NiceBytes(CompressedSize),
NiceBytes(DecompressedSize));
- VerifiedAttachmentCount++;
- VerifiedByteCount += DecompressedSize;
- if (VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
+ ValidateStats.VerifiedAttachmentCount++;
+ ValidateStats.VerifiedByteCount += DecompressedSize;
+ if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
{
FilteredVerifiedBytesPerSecond.Stop();
}
@@ -1594,17 +1677,20 @@ namespace {
Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(IsAborted, PendingWork);
+ const uint64_t DownloadedAttachmentCount = DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount;
+ const uint64_t DownloadedByteCount = DownloadStats.DownloadedChunkByteCount + DownloadStats.DownloadedBlockByteCount;
+
FilteredDownloadedBytesPerSecond.Update(DownloadedByteCount);
- FilteredVerifiedBytesPerSecond.Update(VerifiedByteCount);
+ FilteredVerifiedBytesPerSecond.Update(ValidateStats.VerifiedByteCount);
std::string Details = fmt::format("Downloaded {}/{} ({}, {}bits/s). Verified {}/{} ({}, {}B/s)",
- DownloadedAttachmentCount.load(),
+ DownloadedAttachmentCount,
AttachmentsToVerifyCount,
- NiceBytes(DownloadedByteCount.load()),
+ NiceBytes(DownloadedByteCount),
NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8),
- VerifiedAttachmentCount.load(),
+ ValidateStats.VerifiedAttachmentCount.load(),
AttachmentsToVerifyCount,
- NiceBytes(VerifiedByteCount.load()),
+ NiceBytes(ValidateStats.VerifiedByteCount.load()),
NiceNum(FilteredVerifiedBytesPerSecond.GetCurrent()));
ProgressBar.UpdateState(
@@ -1612,11 +1698,12 @@ namespace {
.Details = Details,
.TotalCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2),
.RemainingCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2 -
- (DownloadedAttachmentCount.load() + VerifiedAttachmentCount.load()))},
+ (DownloadedAttachmentCount + ValidateStats.VerifiedAttachmentCount.load()))},
false);
});
ProgressBar.Finish();
+ ValidateStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs();
}
void ArrangeChunksIntoBlocks(const ChunkedFolderContent& Content,
@@ -1717,7 +1804,9 @@ namespace {
const ChunkedFolderContent& Content,
const ChunkedContentLookup& Lookup,
uint32_t ChunkIndex,
- const std::filesystem::path& TempFolderPath)
+ const std::filesystem::path& TempFolderPath,
+ std::atomic<uint64_t>& ReadRawBytes,
+ LooseChunksStatistics& LooseChunksStats)
{
ZEN_TRACE_CPU("CompressChunk");
ZEN_ASSERT(!TempFolderPath.empty());
@@ -1750,7 +1839,12 @@ namespace {
bool CouldCompress = CompressedBuffer::CompressToStream(
CompositeBuffer(SharedBuffer(RawSource)),
- [&](uint64_t Offset, const CompositeBuffer& RangeBuffer) { CompressedFile.Write(RangeBuffer, Offset); });
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset);
+ ReadRawBytes += SourceSize;
+ CompressedFile.Write(RangeBuffer, Offset);
+ LooseChunksStats.CompressedChunkBytes += RangeBuffer.GetSize();
+ });
if (CouldCompress)
{
uint64_t CompressedSize = CompressedFile.FileSize();
@@ -1768,6 +1862,9 @@ namespace {
ZEN_ASSERT(Compressed);
ZEN_ASSERT(RawHash == ChunkHash);
ZEN_ASSERT(RawSize == ChunkSize);
+
+ LooseChunksStats.CompressedChunkCount++;
+
return Compressed.GetCompressed();
}
CompressedFile.Close();
@@ -2007,7 +2104,6 @@ namespace {
const std::uint64_t LargeAttachmentSize,
DiskStatistics& DiskStats,
UploadStatistics& UploadStats,
- GenerateBlocksStatistics& GenerateBlocksStats,
LooseChunksStatistics& LooseChunksStats)
{
ZEN_TRACE_CPU("UploadPartBlobs");
@@ -2257,8 +2353,6 @@ namespace {
Payload = std::move(CompressedBlock).GetCompressed();
}
- GenerateBlocksStats.GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex];
- GenerateBlocksStats.GeneratedBlockCount++;
GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex];
GeneratedBlockCount++;
if (GeneratedBlockCount == GenerateBlockIndexes.size())
@@ -2279,9 +2373,7 @@ namespace {
}
}
- std::atomic<uint64_t> CompressedLooseChunkCount = 0;
- std::atomic<uint64_t> CompressedLooseChunkByteCount = 0;
- std::atomic<uint64_t> RawLooseChunkByteCount = 0;
+ std::atomic<uint64_t> RawLooseChunkByteCount = 0;
// Start compression of any non-precompressed loose chunks and schedule upload
for (const uint32_t CompressLooseChunkOrderIndex : CompressLooseChunkOrderIndexes)
@@ -2295,19 +2387,20 @@ namespace {
ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk");
FilteredCompressedBytesPerSecond.Start();
- CompositeBuffer Payload = CompressChunk(Path, Content, Lookup, ChunkIndex, Path / ZenTempChunkFolderName);
+ CompositeBuffer Payload = CompressChunk(Path,
+ Content,
+ Lookup,
+ ChunkIndex,
+ Path / ZenTempChunkFolderName,
+ RawLooseChunkByteCount,
+ LooseChunksStats);
ZEN_CONSOLE_VERBOSE("Compressed chunk {} ({} -> {})",
Content.ChunkedContent.ChunkHashes[ChunkIndex],
NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]),
NiceBytes(Payload.GetSize()));
const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
UploadStats.ReadFromDiskBytes += ChunkRawSize;
- LooseChunksStats.CompressedChunkBytes += Payload.GetSize();
- LooseChunksStats.CompressedChunkCount++;
- CompressedLooseChunkByteCount += Payload.GetSize();
- CompressedLooseChunkCount++;
- RawLooseChunkByteCount += ChunkRawSize;
- if (CompressedLooseChunkCount == CompressLooseChunkOrderIndexes.size())
+ if (LooseChunksStats.CompressedChunkCount == CompressLooseChunkOrderIndexes.size())
{
FilteredCompressedBytesPerSecond.Stop();
}
@@ -2322,20 +2415,21 @@ namespace {
Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(IsAborted, PendingWork);
- FilteredCompressedBytesPerSecond.Update(CompressedLooseChunkByteCount.load());
+ FilteredCompressedBytesPerSecond.Update(LooseChunksStats.CompressedChunkBytes.load());
FilteredGenerateBlockBytesPerSecond.Update(GeneratedBlockByteCount.load());
FilteredUploadedBytesPerSecond.Update(UploadedCompressedChunkSize.load() + UploadedBlockSize.load());
uint64_t UploadedRawSize = UploadedRawChunkSize.load() + UploadedBlockSize.load();
uint64_t UploadedCompressedSize = UploadedCompressedChunkSize.load() + UploadedBlockSize.load();
std::string Details = fmt::format(
- "Compressed {}/{} ({}/{}) chunks. "
+ "Compressed {}/{} ({}/{} {}B/s) chunks. "
"Uploaded {}/{} ({}/{}) blobs "
"({} {}bits/s)",
- CompressedLooseChunkCount.load(),
+ LooseChunksStats.CompressedChunkCount.load(),
CompressLooseChunkOrderIndexes.size(),
NiceBytes(RawLooseChunkByteCount),
NiceBytes(TotalLooseChunksSize),
+ NiceNum(FilteredCompressedBytesPerSecond.GetCurrent()),
UploadedBlockCount.load() + UploadedChunkCount.load(),
UploadBlockCount + UploadChunkCount,
@@ -2355,9 +2449,8 @@ namespace {
ZEN_ASSERT(AbortFlag || QueuedPendingInMemoryBlocksForUpload.load() == 0);
ProgressBar.Finish();
- UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS();
- GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGenerateBlockBytesPerSecond.GetElapsedTimeUS();
- LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTimeUS();
+ UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS();
+ LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTimeUS();
}
}
@@ -2550,6 +2643,8 @@ namespace {
CreateDirectories(Path / ZenTempBlockFolderName);
CreateDirectories(Path / ZenTempChunkFolderName);
+ std::uint64_t TotalRawSize = 0;
+
CbObject ChunkerParameters;
struct PrepareBuildResult
@@ -2770,7 +2865,7 @@ namespace {
ChunkerParameters = ChunkParametersWriter.Save();
}
- std::uint64_t TotalRawSize = std::accumulate(Content.RawSizes.begin(), Content.RawSizes.end(), std::uint64_t(0));
+ TotalRawSize = std::accumulate(Content.RawSizes.begin(), Content.RawSizes.end(), std::uint64_t(0));
{
ProgressBar ProgressBar(UsePlainProgress);
@@ -3139,21 +3234,16 @@ namespace {
{
ZEN_CONSOLE_VERBOSE("Uploading attachments: {}", FormatArray<IoHash>(RawHashes, "\n "sv));
- UploadStatistics TempUploadStats;
- GenerateBlocksStatistics TempGenerateBlocksStats;
- LooseChunksStatistics TempLooseChunksStats;
+ UploadStatistics TempUploadStats;
+ LooseChunksStatistics TempLooseChunksStats;
Stopwatch TempUploadTimer;
auto __ = MakeGuard([&]() {
uint64_t TempChunkUploadTimeUs = TempUploadTimer.GetElapsedTimeUs();
ZEN_CONSOLE(
- "Generated {} ({} {}B/s) and uploaded {} ({}) blocks. "
+ "Uploaded {} ({}) blocks. "
"Compressed {} ({} {}B/s) and uploaded {} ({}) chunks. "
"Transferred {} ({}bits/s) in {}",
- TempGenerateBlocksStats.GeneratedBlockCount.load(),
- NiceBytes(TempGenerateBlocksStats.GeneratedBlockByteCount.load()),
- NiceNum(GetBytesPerSecond(TempGenerateBlocksStats.GenerateBlocksElapsedWallTimeUS,
- TempGenerateBlocksStats.GeneratedBlockByteCount)),
TempUploadStats.BlockCount.load(),
NiceBytes(TempUploadStats.BlocksBytes),
@@ -3180,11 +3270,9 @@ namespace {
LargeAttachmentSize,
DiskStats,
TempUploadStats,
- TempGenerateBlocksStats,
TempLooseChunksStats);
UploadStats += TempUploadStats;
LooseChunksStats += TempLooseChunksStats;
- GenerateBlocksStats += TempGenerateBlocksStats;
}
};
if (IgnoreExistingBlocks)
@@ -3266,18 +3354,25 @@ namespace {
}
}
+ ValidateStatistics ValidateStats;
+ DownloadStatistics ValidateDownloadStats;
if (PostUploadVerify && !AbortFlag)
{
- ValidateBuildPart(Storage, BuildId, BuildPartId, BuildPartName);
+ ValidateBuildPart(Storage, BuildId, BuildPartId, BuildPartName, ValidateStats, ValidateDownloadStats);
}
- const double DeltaByteCountPercent =
- ChunkingStats.BytesHashed > 0
- ? (100.0 * (FindBlocksStats.NewBlocksChunkByteCount + LooseChunksStats.CompressedChunkBytes)) / (ChunkingStats.BytesHashed)
- : 0.0;
-
- const std::string LargeAttachmentStats =
- (LargeAttachmentSize != (uint64_t)-1) ? fmt::format(" ({} as multipart)", UploadStats.MultipartAttachmentCount.load()) : "";
+ struct ValidateStatistics
+ {
+ uint64_t BuildBlobSize = 0;
+ uint64_t BuildPartSize = 0;
+ uint64_t ChunkAttachmentCount = 0;
+ uint64_t BlockAttachmentCount = 0;
+ std::atomic<uint64_t> DownloadedAttachmentCount = 0;
+ std::atomic<uint64_t> VerifiedAttachmentCount = 0;
+ std::atomic<uint64_t> DownloadedByteCount = 0;
+ std::atomic<uint64_t> VerifiedByteCount = 0;
+ uint64_t ElapsedWallTimeUS = 0;
+ };
ZEN_CONSOLE_VERBOSE(
"Folder scanning stats:"
@@ -3401,42 +3496,122 @@ namespace {
UploadStats.MultipartAttachmentCount.load(),
NiceLatencyNs(UploadStats.ElapsedWallTimeUS * 1000));
+ if (PostUploadVerify)
+ {
+ ZEN_CONSOLE_VERBOSE(
+ "Validate stats:"
+ "\n BuildBlobSize: {}"
+ "\n BuildPartSize: {}"
+ "\n ChunkAttachmentCount: {}"
+ "\n BlockAttachmentCount: {}"
+ "\n VerifiedAttachmentCount: {}"
+ "\n VerifiedByteCount: {}"
+ "\n ElapsedWallTimeUS: {}",
+ NiceBytes(ValidateStats.BuildBlobSize),
+ NiceBytes(ValidateStats.BuildPartSize),
+ ValidateStats.ChunkAttachmentCount,
+ ValidateStats.BlockAttachmentCount,
+ ValidateStats.VerifiedAttachmentCount.load(),
+ NiceBytes(ValidateStats.VerifiedByteCount.load()),
+ NiceLatencyNs(ValidateStats.ElapsedWallTimeUS * 1000));
+
+ ZEN_CONSOLE_VERBOSE(
+ "Validate download stats:"
+ "\n RequestsCompleteCount: {}"
+ "\n DownloadedChunkCount: {}"
+ "\n DownloadedChunkByteCount: {}"
+ "\n MultipartAttachmentCount: {}"
+ "\n DownloadedBlockCount: {}"
+ "\n DownloadedBlockByteCount: {}"
+ "\n DownloadedPartialBlockCount: {}"
+ "\n DownloadedPartialBlockByteCount: {}",
+ ValidateDownloadStats.RequestsCompleteCount.load(),
+ ValidateDownloadStats.DownloadedChunkCount.load(),
+ NiceBytes(ValidateDownloadStats.DownloadedChunkByteCount.load()),
+ ValidateDownloadStats.MultipartAttachmentCount.load(),
+ ValidateDownloadStats.DownloadedBlockCount.load(),
+ NiceBytes(ValidateDownloadStats.DownloadedBlockByteCount.load()),
+ ValidateDownloadStats.DownloadedPartialBlockCount.load(),
+ NiceBytes(ValidateDownloadStats.DownloadedPartialBlockByteCount.load()));
+ }
+
+ const double DeltaByteCountPercent =
+ ChunkingStats.BytesHashed > 0
+ ? (100.0 * (FindBlocksStats.NewBlocksChunkByteCount + LooseChunksStats.CompressedChunkBytes)) / (ChunkingStats.BytesHashed)
+ : 0.0;
+
+ const std::string MultipartAttachmentStats =
+ (LargeAttachmentSize != (uint64_t)-1) ? fmt::format(" ({} as multipart)", UploadStats.MultipartAttachmentCount.load()) : "";
+
+ std::string ValidateInfo;
+ if (PostUploadVerify)
+ {
+ const uint64_t DownloadedCount = ValidateDownloadStats.DownloadedChunkCount + ValidateDownloadStats.DownloadedBlockCount;
+ const uint64_t DownloadedByteCount =
+ ValidateDownloadStats.DownloadedChunkByteCount + ValidateDownloadStats.DownloadedBlockByteCount;
+ ValidateInfo = fmt::format("\n Verified: {} ({}) {}B/sec in {}",
+ DownloadedCount,
+ NiceBytes(DownloadedByteCount),
+ NiceNum(GetBytesPerSecond(ValidateStats.ElapsedWallTimeUS, DownloadedByteCount)),
+ NiceTimeSpanMs(ValidateStats.ElapsedWallTimeUS / 1000));
+ }
+
ZEN_CONSOLE(
- "Uploaded {}\n"
- " Delta: {}/{} ({:.1f}%)\n"
- " Blocks: {} ({})\n"
- " Chunks: {} ({}){}\n"
- " Rate: {}bits/sec",
- NiceBytes(UploadStats.BlocksBytes + UploadStats.ChunksBytes),
+ "Uploaded part {} ('{}') to build {} in {} \n"
+ " Scanned files: {} ({}) {}B/sec in {}\n"
+ " New data: {} ({:.1f}%)\n"
+ " New blocks: {} ({}) {}B/sec\n"
+ " New chunks: {} ({} -> {}) {}B/sec\n"
+ " Uploaded: {} ({}) {}bits/sec\n"
+ " Blocks: {} ({})\n"
+ " Chunks: {} ({}){}"
+ "{}",
+ BuildPartId,
+ BuildPartName,
+ BuildId,
+ NiceTimeSpanMs(ProcessTimer.GetElapsedTimeMs()),
+
+ LocalFolderScanStats.FoundFileCount.load(),
+ NiceBytes(LocalFolderScanStats.FoundFileByteCount.load()),
+ NiceNum(GetBytesPerSecond(ChunkingStats.ElapsedWallTimeUS, ChunkingStats.BytesHashed)),
+ NiceTimeSpanMs(ChunkingStats.ElapsedWallTimeUS / 1000),
NiceBytes(FindBlocksStats.NewBlocksChunkByteCount + LooseChunksStats.CompressedChunkBytes),
- NiceBytes(ChunkingStats.BytesHashed),
DeltaByteCountPercent,
+ GenerateBlocksStats.GeneratedBlockCount.load(),
+ NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount.load()),
+ NiceNum(GetBytesPerSecond(GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS, GenerateBlocksStats.GeneratedBlockByteCount)),
+
+ LooseChunksStats.CompressedChunkCount.load(),
+ NiceBytes(LooseChunksStats.ChunkByteCount),
+ NiceBytes(LooseChunksStats.CompressedChunkBytes.load()),
+ NiceNum(GetBytesPerSecond(LooseChunksStats.CompressChunksElapsedWallTimeUS, LooseChunksStats.ChunkByteCount)),
+
+ NiceBytes(UploadStats.BlockCount.load() + UploadStats.ChunkCount.load()),
+ NiceBytes(UploadStats.BlocksBytes + UploadStats.ChunksBytes),
+ NiceNum(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, (UploadStats.ChunksBytes + UploadStats.BlocksBytes * 8))),
+
UploadStats.BlockCount.load(),
- NiceBytes(UploadStats.BlocksBytes),
- UploadStats.ChunkCount.load(),
- NiceBytes(UploadStats.ChunksBytes),
- LargeAttachmentStats,
+ NiceBytes(UploadStats.BlocksBytes.load()),
- NiceNum(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, (UploadStats.ChunksBytes + UploadStats.BlocksBytes * 8))));
+ UploadStats.ChunkCount.load(),
+ NiceBytes(UploadStats.ChunksBytes.load()),
+ MultipartAttachmentStats,
- ZEN_CONSOLE("Uploaded ({}) build {} part {} ({}) in {}",
- NiceBytes(FindBlocksStats.NewBlocksChunkByteCount + LooseChunksStats.CompressedChunkBytes),
- BuildId,
- BuildPartName,
- BuildPartId,
- NiceTimeSpanMs(ProcessTimer.GetElapsedTimeMs()));
+ ValidateInfo);
}
- void VerifyFolder(const ChunkedFolderContent& Content, const std::filesystem::path& Path, bool VerifyFileHash)
+ void VerifyFolder(const ChunkedFolderContent& Content,
+ const std::filesystem::path& Path,
+ bool VerifyFileHash,
+ VerifyFolderStatistics& VerifyFolderStats)
{
ZEN_TRACE_CPU("VerifyFolder");
- ProgressBar ProgressBar(UsePlainProgress);
- std::atomic<uint64_t> FilesVerified(0);
- std::atomic<uint64_t> FilesFailed(0);
- std::atomic<uint64_t> ReadBytes(0);
+ Stopwatch Timer;
+
+ ProgressBar ProgressBar(UsePlainProgress);
WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
@@ -3492,7 +3667,7 @@ namespace {
ErrorLock.WithExclusiveLock([&]() {
Errors.push_back(fmt::format("File {} with expected size {} does not exist", TargetPath, ExpectedSize));
});
- FilesFailed++;
+ VerifyFolderStats.FilesFailed++;
}
else
{
@@ -3504,7 +3679,7 @@ namespace {
Errors.push_back(
fmt::format("Failed to get size of file {}: {} ({})", TargetPath, Ec.message(), Ec.value()));
});
- FilesFailed++;
+ VerifyFolderStats.FilesFailed++;
}
else if (SizeOnDisk < ExpectedSize)
{
@@ -3514,7 +3689,7 @@ namespace {
ExpectedSize,
SizeOnDisk));
});
- FilesFailed++;
+ VerifyFolderStats.FilesFailed++;
}
else if (SizeOnDisk > ExpectedSize)
{
@@ -3524,7 +3699,7 @@ namespace {
ExpectedSize,
SizeOnDisk));
});
- FilesFailed++;
+ VerifyFolderStats.FilesFailed++;
}
else if (SizeOnDisk > 0 && VerifyFileHash)
{
@@ -3559,13 +3734,13 @@ namespace {
}
FileOffset += ChunkSize;
}
- FilesFailed++;
+ VerifyFolderStats.FilesFailed++;
}
- ReadBytes += SizeOnDisk;
+ VerifyFolderStats.ReadBytes += SizeOnDisk;
}
}
}
- FilesVerified++;
+ VerifyFolderStats.FilesVerified++;
}
},
[&, PathIndex](const std::exception& Ex, std::atomic<bool>&) {
@@ -3574,23 +3749,25 @@ namespace {
(Path / Content.Paths[PathIndex]).make_preferred(),
Ex.what()));
});
- FilesFailed++;
+ VerifyFolderStats.FilesFailed++;
});
}
Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(IsAborted, PendingWork);
std::string Details = fmt::format("Verified {}/{} ({}). Failed files: {}",
- FilesVerified.load(),
+ VerifyFolderStats.FilesVerified.load(),
PathCount,
- NiceBytes(ReadBytes.load()),
- FilesFailed.load());
+ NiceBytes(VerifyFolderStats.ReadBytes.load()),
+ VerifyFolderStats.FilesFailed.load());
ProgressBar.UpdateState({.Task = "Verifying files ",
.Details = Details,
.TotalCount = gsl::narrow<uint64_t>(PathCount),
- .RemainingCount = gsl::narrow<uint64_t>(PathCount - FilesVerified.load())},
+ .RemainingCount = gsl::narrow<uint64_t>(PathCount - VerifyFolderStats.FilesVerified.load())},
false);
});
+ VerifyFolderStats.VerifyElapsedWallTimeUs = Timer.GetElapsedTimeUs();
+
ProgressBar.Finish();
for (const std::string& Error : Errors)
{
@@ -3605,7 +3782,7 @@ namespace {
class WriteFileCache
{
public:
- WriteFileCache() {}
+ WriteFileCache(DiskStatistics& DiskStats) : m_DiskStats(DiskStats) {}
~WriteFileCache() { Flush(); }
template<typename TBufferType>
@@ -3621,6 +3798,8 @@ namespace {
ZEN_TRACE_CPU("WriteFileCache_WriteToFile_CacheWrite");
ZEN_ASSERT(OpenFileWriter);
OpenFileWriter->Write(Buffer, FileOffset);
+ m_DiskStats.WriteCount++;
+ m_DiskStats.WriteByteCount += Buffer.GetSize();
}
else
{
@@ -3643,6 +3822,8 @@ namespace {
}
return --Tries > 0;
});
+ m_DiskStats.OpenWriteCount++;
+ m_DiskStats.CurrentOpenFileCount++;
}
const bool CacheWriter = TargetFinalSize > Buffer.GetSize();
@@ -3654,12 +3835,18 @@ namespace {
OutputFile = std::move(NewOutputFile);
OpenFileWriter = std::make_unique<BasicFileWriter>(*OutputFile, Min(TargetFinalSize, 256u * 1024u));
OpenFileWriter->Write(Buffer, FileOffset);
+ m_DiskStats.WriteCount++;
+ m_DiskStats.WriteByteCount += Buffer.GetSize();
SeenTargetIndexes.push_back(TargetIndex);
}
else
{
ZEN_TRACE_CPU("WriteFileCache_WriteToFile_Write");
NewOutputFile->Write(Buffer, FileOffset);
+ m_DiskStats.WriteCount++;
+ m_DiskStats.WriteByteCount += Buffer.GetSize();
+ NewOutputFile = {};
+ m_DiskStats.CurrentOpenFileCount--;
}
}
}
@@ -3667,9 +3854,15 @@ namespace {
void Flush()
{
ZEN_TRACE_CPU("WriteFileCache_Flush");
+ if (OutputFile)
+ {
+ m_DiskStats.CurrentOpenFileCount--;
+ }
+
OpenFileWriter = {};
OutputFile = {};
}
+ DiskStatistics& m_DiskStats;
std::vector<uint32_t> SeenTargetIndexes;
std::unique_ptr<BasicFile> OutputFile;
std::unique_ptr<BasicFileWriter> OpenFileWriter;
@@ -3696,6 +3889,100 @@ namespace {
return ChunkTargetPtrs;
};
+ void FinalizeChunkSequence(const std::filesystem::path& TargetFolder, const IoHash& SequenceRawHash)
+ {
+ ZEN_TRACE_CPU("FinalizeChunkSequence");
+ ZEN_ASSERT_SLOW(!std::filesystem::exists(GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
+ std::filesystem::rename(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash),
+ GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash));
+ }
+
+ void FinalizeChunkSequences(const std::filesystem::path& TargetFolder,
+ const ChunkedFolderContent& RemoteContent,
+ std::span<const uint32_t> RemoteSequenceIndexes)
+ {
+ ZEN_TRACE_CPU("FinalizeChunkSequences");
+ for (uint32_t SequenceIndex : RemoteSequenceIndexes)
+ {
+ FinalizeChunkSequence(TargetFolder, RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
+ }
+ }
+
+ void VerifyAndCompleteChunkSequencesAsync(const std::filesystem::path& TargetFolder,
+ const ChunkedFolderContent& RemoteContent,
+ std::span<const uint32_t> RemoteSequenceIndexes,
+ ParallellWork& Work,
+ WorkerThreadPool& VerifyPool)
+ {
+ if (RemoteSequenceIndexes.empty())
+ {
+ return;
+ }
+ ZEN_TRACE_CPU("VerifyAndCompleteChunkSequence");
+ for (uint32_t RemoteSequenceIndexOffset = 1; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++)
+ {
+ const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset];
+ Work.ScheduleWork(
+ VerifyPool,
+ [&RemoteContent, TargetFolder, RemoteSequenceIndex](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("VerifyAndCompleteChunkSequenceAsync");
+ const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ {
+ ZEN_TRACE_CPU("HashSequence");
+ const IoHash VerifyChunkHash = IoHash::HashBuffer(
+ IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
+ if (VerifyChunkHash != SequenceRawHash)
+ {
+ throw std::runtime_error(fmt::format("Written chunk sequence {} hash does not match expected hash {}",
+ VerifyChunkHash,
+ SequenceRawHash));
+ }
+ }
+ FinalizeChunkSequence(TargetFolder, SequenceRawHash);
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+ const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0];
+
+ const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ {
+ ZEN_TRACE_CPU("HashSequence");
+ const IoHash VerifyChunkHash =
+ IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
+ if (VerifyChunkHash != SequenceRawHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash));
+ }
+ }
+ FinalizeChunkSequence(TargetFolder, SequenceRawHash);
+ }
+
+ bool CompleteSequenceChunk(uint32_t RemoteSequenceIndex, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters)
+ {
+ return SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1;
+ }
+
+ std::vector<uint32_t> CompleteChunkTargets(const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters)
+ {
+ ZEN_TRACE_CPU("CompleteChunkTargets");
+
+ std::vector<uint32_t> CompletedSequenceIndexes;
+ for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs)
+ {
+ const uint32_t RemoteSequenceIndex = Location->SequenceIndex;
+ if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
+ {
+ CompletedSequenceIndexes.push_back(RemoteSequenceIndex);
+ }
+ }
+ return CompletedSequenceIndexes;
+ }
+
struct BlockWriteOps
{
std::vector<CompositeBuffer> ChunkBuffers;
@@ -3712,12 +3999,14 @@ namespace {
const ChunkedContentLookup& Lookup,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
const BlockWriteOps& Ops,
- std::atomic<uint32_t>& OutChunksComplete,
- std::atomic<uint64_t>& OutBytesWritten)
+ ParallellWork& Work,
+ WorkerThreadPool& VerifyPool,
+ DiskStatistics& DiskStats,
+ WriteChunkStatistics& WriteChunkStats)
{
ZEN_TRACE_CPU("WriteBlockChunkOps");
{
- WriteFileCache OpenFileCache;
+ WriteFileCache OpenFileCache(DiskStats);
for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps)
{
if (AbortFlag)
@@ -3743,36 +4032,76 @@ namespace {
Chunk,
FileOffset,
RemoteContent.RawSizes[PathIndex]);
- OutBytesWritten += ChunkSize;
}
+ WriteChunkStats.ChunkCountWritten += gsl::narrow<uint32_t>(Ops.ChunkBuffers.size());
+ WriteChunkStats.ChunkBytesWritten +=
+ std::accumulate(Ops.ChunkBuffers.begin(),
+ Ops.ChunkBuffers.end(),
+ uint64_t(0),
+ [](uint64_t Current, const CompositeBuffer& Buffer) -> uint64_t { return Current + Buffer.GetSize(); });
}
if (!AbortFlag)
{
// Write tracking, updating this must be done without any files open (WriteFileCache)
+ std::vector<uint32_t> CompletedChunkSequences;
for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps)
{
const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex;
- if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
+ if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
{
- const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- {
- ZEN_TRACE_CPU("VerifyChunkHash");
- const IoHash VerifyChunkHash = IoHash::HashBuffer(
- IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
- if (VerifyChunkHash != SequenceRawHash)
- {
- throw std::runtime_error(fmt::format("Written chunk sequence {} hash does not match expected hash {}",
- VerifyChunkHash,
- SequenceRawHash));
- }
- }
- ZEN_TRACE_CPU("VerifyChunkHashes_rename");
- ZEN_ASSERT_SLOW(!std::filesystem::exists(GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
- std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
- GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
+ CompletedChunkSequences.push_back(RemoteSequenceIndex);
+ }
+ }
+ VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, CompletedChunkSequences, Work, VerifyPool);
+ }
+ }
+
+ IoBuffer MakeBufferMemoryBased(const CompositeBuffer& PartialBlockBuffer)
+ {
+ ZEN_TRACE_CPU("MakeBufferMemoryBased");
+ IoBuffer BlockMemoryBuffer;
+ std::span<const SharedBuffer> Segments = PartialBlockBuffer.GetSegments();
+ if (Segments.size() == 1)
+ {
+ IoBufferFileReference FileRef = {};
+ if (PartialBlockBuffer.GetSegments().front().AsIoBuffer().GetFileReference(FileRef))
+ {
+ BlockMemoryBuffer = UniqueBuffer::Alloc(FileRef.FileChunkSize).MoveToShared().AsIoBuffer();
+ BasicFile Reader;
+ Reader.Attach(FileRef.FileHandle);
+ auto _ = MakeGuard([&Reader]() { Reader.Detach(); });
+ MutableMemoryView ReadMem = BlockMemoryBuffer.GetMutableView();
+ Reader.Read(ReadMem.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset);
+ return BlockMemoryBuffer;
+ }
+ else
+ {
+ return PartialBlockBuffer.GetSegments().front().AsIoBuffer();
+ }
+ }
+ else
+ {
+ // Not a homogenous memory buffer, read all to memory
+
+ BlockMemoryBuffer = UniqueBuffer::Alloc(PartialBlockBuffer.GetSize()).MoveToShared().AsIoBuffer();
+ MutableMemoryView ReadMem = BlockMemoryBuffer.GetMutableView();
+ for (const SharedBuffer& Segment : Segments)
+ {
+ IoBufferFileReference FileRef = {};
+ if (Segment.AsIoBuffer().GetFileReference(FileRef))
+ {
+ BasicFile Reader;
+ Reader.Attach(FileRef.FileHandle);
+ Reader.Read(ReadMem.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset);
+ Reader.Detach();
+ ReadMem = ReadMem.Mid(FileRef.FileChunkSize);
+ }
+ else
+ {
+ ReadMem = ReadMem.CopyFrom(Segment.AsIoBuffer().GetView());
}
}
- OutChunksComplete += gsl::narrow<uint32_t>(Ops.ChunkBuffers.size());
+ return BlockMemoryBuffer;
}
}
@@ -3780,31 +4109,14 @@ namespace {
const ChunkedContentLookup& Lookup,
std::span<const IoHash> ChunkRawHashes,
std::span<const uint32_t> ChunkCompressedLengths,
- std::span<const uint32_t> ChunkRawLengths,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- CompositeBuffer&& PartialBlockBuffer,
+ const MemoryView BlockView,
uint32_t FirstIncludedBlockChunkIndex,
uint32_t LastIncludedBlockChunkIndex,
BlockWriteOps& OutOps)
{
ZEN_TRACE_CPU("GetBlockWriteOps");
- MemoryView BlockMemoryView;
- UniqueBuffer BlockMemoryBuffer;
- IoBufferFileReference FileRef = {};
- if (PartialBlockBuffer.GetSegments().size() == 1 && PartialBlockBuffer.GetSegments()[0].AsIoBuffer().GetFileReference(FileRef))
- {
- BlockMemoryBuffer = UniqueBuffer::Alloc(FileRef.FileChunkSize);
- BasicFile Reader;
- Reader.Attach(FileRef.FileHandle);
- Reader.Read(BlockMemoryBuffer.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset);
- BlockMemoryView = BlockMemoryBuffer.GetView();
- Reader.Detach();
- }
- else
- {
- BlockMemoryView = PartialBlockBuffer.ViewOrCopyRange(0, PartialBlockBuffer.GetSize(), BlockMemoryBuffer);
- }
uint32_t OffsetInBlock = 0;
for (uint32_t ChunkBlockIndex = FirstIncludedBlockChunkIndex; ChunkBlockIndex <= LastIncludedBlockChunkIndex; ChunkBlockIndex++)
{
@@ -3821,32 +4133,9 @@ namespace {
bool NeedsWrite = true;
if (RemoteChunkIndexNeedsCopyFromSourceFlags[ChunkIndex].compare_exchange_strong(NeedsWrite, false))
{
- // CompositeBuffer Chunk = PartialBlockBuffer.Mid(OffsetInBlock, ChunkCompressedSize);
- MemoryView ChunkMemory = BlockMemoryView.Mid(OffsetInBlock, ChunkCompressedSize);
- CompositeBuffer Chunk = CompositeBuffer(IoBuffer(IoBuffer::Wrap, ChunkMemory.GetData(), ChunkMemory.GetSize()));
- IoHash VerifyChunkHash;
- uint64_t VerifyRawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Chunk, VerifyChunkHash, VerifyRawSize);
- if (!Compressed)
- {
- ZEN_ASSERT(false);
- }
- if (VerifyChunkHash != ChunkHash)
- {
- ZEN_ASSERT(false);
- }
- if (!ChunkRawLengths.empty())
- {
- if (VerifyRawSize != ChunkRawLengths[ChunkBlockIndex])
- {
- ZEN_ASSERT(false);
- }
- }
- CompositeBuffer Decompressed = Compressed.DecompressToComposite();
- if (!Decompressed)
- {
- throw std::runtime_error(fmt::format("Decompression of build blob {} failed", ChunkHash));
- }
+ MemoryView ChunkMemoryView = BlockView.Mid(OffsetInBlock + CompressedBuffer::GetHeaderSizeForNoneEncoder(),
+ ChunkCompressedSize - CompressedBuffer::GetHeaderSizeForNoneEncoder());
+ IoBuffer Decompressed(IoBuffer::Wrap, ChunkMemoryView.GetData(), ChunkMemoryView.GetSize());
ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed));
ZEN_ASSERT(Decompressed.GetSize() == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs)
@@ -3861,19 +4150,22 @@ namespace {
OffsetInBlock += ChunkCompressedSize;
}
- std::sort(OutOps.WriteOps.begin(),
- OutOps.WriteOps.end(),
- [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) {
- if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
- {
- return true;
- }
- if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
- {
- return false;
- }
- return Lhs.Target->Offset < Rhs.Target->Offset;
- });
+ {
+ ZEN_TRACE_CPU("GetBlockWriteOps_sort");
+ std::sort(OutOps.WriteOps.begin(),
+ OutOps.WriteOps.end(),
+ [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) {
+ if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
+ {
+ return true;
+ }
+ if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
+ {
+ return false;
+ }
+ return Lhs.Target->Offset < Rhs.Target->Offset;
+ });
+ }
return true;
}
@@ -3881,34 +4173,35 @@ namespace {
const ChunkedFolderContent& RemoteContent,
const ChunkBlockDescription& BlockDescription,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ ParallellWork& Work,
+ WorkerThreadPool& VerifyPool,
CompositeBuffer&& BlockBuffer,
const ChunkedContentLookup& Lookup,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- std::atomic<uint32_t>& OutChunksComplete,
- std::atomic<uint64_t>& OutBytesWritten)
+ DiskStatistics& DiskStats,
+ WriteChunkStatistics& WriteChunkStats)
{
ZEN_TRACE_CPU("WriteBlockToDisk");
+ IoBuffer BlockMemoryBuffer = MakeBufferMemoryBased(BlockBuffer);
+ const MemoryView BlockView = BlockMemoryBuffer.GetView();
+
BlockWriteOps Ops;
if ((BlockDescription.HeaderSize == 0) || BlockDescription.ChunkCompressedLengths.empty())
{
ZEN_TRACE_CPU("WriteBlockToDisk_Legacy");
- UniqueBuffer CopyBuffer;
- const MemoryView BlockView = BlockBuffer.ViewOrCopyRange(0, BlockBuffer.GetSize(), CopyBuffer);
uint64_t HeaderSize;
- const std::vector<uint32_t> ChunkCompressedLengths = ReadChunkBlockHeader(BlockView, HeaderSize);
-
- CompositeBuffer PartialBlockBuffer = std::move(BlockBuffer).Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + HeaderSize);
+ const std::vector<uint32_t> ChunkCompressedLengths =
+ ReadChunkBlockHeader(BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder()), HeaderSize);
if (GetBlockWriteOps(RemoteContent,
Lookup,
BlockDescription.ChunkRawHashes,
ChunkCompressedLengths,
- BlockDescription.ChunkRawLengths,
SequenceIndexChunksLeftToWriteCounters,
RemoteChunkIndexNeedsCopyFromSourceFlags,
- std::move(PartialBlockBuffer),
+ BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + HeaderSize),
0,
gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1),
Ops))
@@ -3918,23 +4211,22 @@ namespace {
Lookup,
SequenceIndexChunksLeftToWriteCounters,
Ops,
- OutChunksComplete,
- OutBytesWritten);
+ Work,
+ VerifyPool,
+ DiskStats,
+ WriteChunkStats);
return true;
}
return false;
}
- CompositeBuffer PartialBlockBuffer =
- std::move(BlockBuffer).Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
if (GetBlockWriteOps(RemoteContent,
Lookup,
BlockDescription.ChunkRawHashes,
BlockDescription.ChunkCompressedLengths,
- BlockDescription.ChunkRawLengths,
SequenceIndexChunksLeftToWriteCounters,
RemoteChunkIndexNeedsCopyFromSourceFlags,
- std::move(PartialBlockBuffer),
+ BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize),
0,
gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1),
Ops))
@@ -3944,8 +4236,10 @@ namespace {
Lookup,
SequenceIndexChunksLeftToWriteCounters,
Ops,
- OutChunksComplete,
- OutBytesWritten);
+ Work,
+ VerifyPool,
+ DiskStats,
+ WriteChunkStats);
return true;
}
return false;
@@ -3955,24 +4249,29 @@ namespace {
const ChunkedFolderContent& RemoteContent,
const ChunkBlockDescription& BlockDescription,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ ParallellWork& Work,
+ WorkerThreadPool& VerifyPool,
CompositeBuffer&& PartialBlockBuffer,
uint32_t FirstIncludedBlockChunkIndex,
uint32_t LastIncludedBlockChunkIndex,
const ChunkedContentLookup& Lookup,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- std::atomic<uint32_t>& OutChunksComplete,
- std::atomic<uint64_t>& OutBytesWritten)
+ DiskStatistics& DiskStats,
+ WriteChunkStatistics& WriteChunkStats)
{
ZEN_TRACE_CPU("WritePartialBlockToDisk");
+
+ IoBuffer BlockMemoryBuffer = MakeBufferMemoryBased(PartialBlockBuffer);
+ const MemoryView BlockView = BlockMemoryBuffer.GetView();
+
BlockWriteOps Ops;
if (GetBlockWriteOps(RemoteContent,
Lookup,
BlockDescription.ChunkRawHashes,
BlockDescription.ChunkCompressedLengths,
- BlockDescription.ChunkRawLengths,
SequenceIndexChunksLeftToWriteCounters,
RemoteChunkIndexNeedsCopyFromSourceFlags,
- std::move(PartialBlockBuffer),
+ BlockView,
FirstIncludedBlockChunkIndex,
LastIncludedBlockChunkIndex,
Ops))
@@ -3982,8 +4281,10 @@ namespace {
Lookup,
SequenceIndexChunksLeftToWriteCounters,
Ops,
- OutChunksComplete,
- OutBytesWritten);
+ Work,
+ VerifyPool,
+ DiskStats,
+ WriteChunkStats);
return true;
}
else
@@ -4031,8 +4332,7 @@ namespace {
const ChunkedContentLookup& Lookup,
std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> ChunkTargets,
CompositeBuffer&& ChunkData,
- WriteFileCache& OpenFileCache,
- std::atomic<uint64_t>& OutBytesWritten)
+ WriteFileCache& OpenFileCache)
{
ZEN_TRACE_CPU("WriteChunkToDisk");
@@ -4051,7 +4351,6 @@ namespace {
ChunkData,
FileOffset,
Content.RawSizes[PathIndex]);
- OutBytesWritten += ChunkData.GetSize();
}
}
@@ -4072,7 +4371,8 @@ namespace {
void StreamDecompress(const std::filesystem::path& CacheFolderPath,
const IoHash& SequenceRawHash,
CompositeBuffer&& CompressedPart,
- std::atomic<uint64_t>& WriteToDiskBytes)
+ DiskStatistics& DiskStats,
+ WriteChunkStatistics& WriteChunkStats)
{
ZEN_TRACE_CPU("StreamDecompress");
const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash);
@@ -4095,21 +4395,29 @@ namespace {
{
throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash));
}
+
IoHashStream Hash;
- bool CouldDecompress = Compressed.DecompressToStream(0, (uint64_t)-1, [&](uint64_t Offset, const CompositeBuffer& RangeBuffer) {
- ZEN_TRACE_CPU("StreamDecompress_Write");
- if (!AbortFlag)
- {
- DecompressedTemp.Write(RangeBuffer, Offset);
- for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ bool CouldDecompress = Compressed.DecompressToStream(
+ 0,
+ (uint64_t)-1,
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset);
+ ZEN_TRACE_CPU("StreamDecompress_Write");
+ DiskStats.ReadByteCount += SourceSize;
+ if (!AbortFlag)
{
- Hash.Append(Segment.GetView());
+ WriteChunkStats.ChunkBytesWritten += RangeBuffer.GetSize();
+ DecompressedTemp.Write(RangeBuffer, Offset);
+ for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ {
+ Hash.Append(Segment.GetView());
+ }
+ DiskStats.WriteByteCount += RangeBuffer.GetSize();
+ DiskStats.WriteCount++;
+ return true;
}
- WriteToDiskBytes += RangeBuffer.GetSize();
- return true;
- }
- return false;
- });
+ return false;
+ });
if (AbortFlag)
{
@@ -4132,6 +4440,7 @@ namespace {
throw std::runtime_error(
fmt::format("Failed moving temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message()));
}
+ WriteChunkStats.ChunkCountWritten++;
}
bool WriteCompressedChunk(const std::filesystem::path& TargetFolder,
@@ -4140,74 +4449,40 @@ namespace {
const IoHash& ChunkHash,
const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
IoBuffer&& CompressedPart,
- std::atomic<uint64_t>& WriteToDiskBytes)
+ DiskStatistics& DiskStats,
+ WriteChunkStatistics& WriteChunkStats)
{
auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end());
+ const uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second;
+ const uint64_t ChunkRawSize = RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex];
if (CanDecompressDirectToSequence(RemoteContent, ChunkTargetPtrs))
{
- const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[ChunkTargetPtrs.front()->SequenceIndex];
- StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), WriteToDiskBytes);
+ const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex;
+ const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex];
+ StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), DiskStats, WriteChunkStats);
}
else
{
- const uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second;
- SharedBuffer Chunk =
- Decompress(CompositeBuffer(std::move(CompressedPart)), ChunkHash, RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+ SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)), ChunkHash, ChunkRawSize);
if (!AbortFlag)
{
- WriteFileCache OpenFileCache;
-
+ WriteFileCache OpenFileCache(DiskStats);
WriteChunkToDisk(TargetFolder,
RemoteContent,
RemoteLookup,
ChunkTargetPtrs,
CompositeBuffer(std::move(Chunk)),
- OpenFileCache,
- WriteToDiskBytes);
+ OpenFileCache);
+ WriteChunkStats.ChunkCountWritten++;
+ WriteChunkStats.ChunkBytesWritten += ChunkRawSize;
return true;
}
}
return false;
}
- void CompleteChunkTargets(const std::filesystem::path& TargetFolder,
- const ChunkedFolderContent& RemoteContent,
- const IoHash& ChunkHash,
- const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
- std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
- const bool NeedHashVerify)
- {
- ZEN_TRACE_CPU("CompleteChunkTargets");
-
- for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs)
- {
- const uint32_t RemoteSequenceIndex = Location->SequenceIndex;
- if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
- {
- const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- if (NeedHashVerify)
- {
- ZEN_TRACE_CPU("VerifyChunkHash");
-
- const IoHash VerifyChunkHash =
- IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
- if (VerifyChunkHash != ChunkHash)
- {
- throw std::runtime_error(
- fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, ChunkHash));
- }
- }
-
- ZEN_TRACE_CPU("RenameToFinal");
- ZEN_ASSERT_SLOW(!std::filesystem::exists(GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
- std::filesystem::rename(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash),
- GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash));
- }
- }
- }
-
void AsyncWriteDownloadedChunk(const std::filesystem::path& Path,
const ChunkedFolderContent& RemoteContent,
const ChunkedContentLookup& RemoteLookup,
@@ -4217,19 +4492,17 @@ namespace {
WorkerThreadPool& WritePool,
IoBuffer&& Payload,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
- std::atomic<uint64_t>& WriteToDiskBytes,
- std::atomic<uint32_t>& ChunkCountWritten,
std::atomic<uint64_t>& WritePartsComplete,
- std::atomic<uint64_t>& TotalPartWriteCount,
- std::atomic<uint64_t>& LooseChunksBytes,
- FilteredRate& FilteredWrittenBytesPerSecond)
+ const uint64_t TotalPartWriteCount,
+ FilteredRate& FilteredWrittenBytesPerSecond,
+ DiskStatistics& DiskStats,
+ WriteChunkStatistics& WriteChunkStats)
{
ZEN_TRACE_CPU("AsyncWriteDownloadedChunk");
const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
- uint64_t Size = Payload.GetSize();
- LooseChunksBytes += Size;
+ const uint64_t Size = Payload.GetSize();
std::filesystem::path CompressedChunkPath;
@@ -4275,6 +4548,7 @@ namespace {
SequenceIndexChunksLeftToWriteCounters,
CompressedChunkPath,
RemoteChunkIndex,
+ TotalPartWriteCount,
ChunkTargetPtrs = std::move(ChunkTargetPtrs),
CompressedPart = std::move(Payload)](std::atomic<bool>&) mutable {
ZEN_TRACE_CPU("UpdateFolder_WriteChunk");
@@ -4307,11 +4581,10 @@ namespace {
ChunkHash,
ChunkTargetPtrs,
std::move(CompressedPart),
- WriteToDiskBytes);
-
+ DiskStats,
+ WriteChunkStats);
if (!AbortFlag)
{
- ChunkCountWritten++;
WritePartsComplete++;
if (WritePartsComplete == TotalPartWriteCount)
{
@@ -4320,12 +4593,16 @@ namespace {
std::filesystem::remove(CompressedChunkPath);
- CompleteChunkTargets(TargetFolder,
- RemoteContent,
- ChunkHash,
- ChunkTargetPtrs,
- SequenceIndexChunksLeftToWriteCounters,
- NeedHashVerify);
+ std::vector<uint32_t> CompletedSequences =
+ CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
+ if (NeedHashVerify)
+ {
+ VerifyAndCompleteChunkSequencesAsync(TargetFolder, RemoteContent, CompletedSequences, Work, WritePool);
+ }
+ else
+ {
+ FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences);
+ }
}
}
},
@@ -4343,19 +4620,16 @@ namespace {
const std::vector<IoHash>& LooseChunkHashes,
bool AllowPartialBlockRequests,
bool WipeTargetFolder,
- FolderContent& OutLocalFolderState)
+ FolderContent& OutLocalFolderState,
+ DiskStatistics& DiskStats,
+ CacheMappingStatistics& CacheMappingStats,
+ DownloadStatistics& DownloadStats,
+ WriteChunkStatistics& WriteChunkStats,
+ RebuildFolderStateStatistics& RebuildFolderStateStats)
{
ZEN_TRACE_CPU("UpdateFolder");
ZEN_UNUSED(WipeTargetFolder);
- std::atomic<uint64_t> DownloadedBlocks = 0;
- std::atomic<uint64_t> BlockBytes = 0;
- std::atomic<uint64_t> DownloadedChunks = 0;
- std::atomic<uint64_t> LooseChunksBytes = 0;
- std::atomic<uint64_t> WriteToDiskBytes = 0;
- std::atomic<uint64_t> MultipartAttachmentCount = 0;
-
- DiskStatistics DiskStats;
Stopwatch IndexTimer;
@@ -4370,15 +4644,11 @@ namespace {
Stopwatch CacheMappingTimer;
std::vector<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters(RemoteContent.ChunkedContent.SequenceRawHashes.size());
- // std::vector<bool> RemoteSequenceIndexIsCachedFlags(RemoteContent.ChunkedContent.SequenceRawHashes.size(), false);
- std::vector<bool> RemoteChunkIndexNeedsCopyFromLocalFileFlags(RemoteContent.ChunkedContent.ChunkHashes.size());
- // Guard if he same chunks is in multiple blocks (can happen due to block reuse, cache reuse blocks writes directly)
- std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(RemoteContent.ChunkedContent.ChunkHashes.size());
+ std::vector<bool> RemoteChunkIndexNeedsCopyFromLocalFileFlags(RemoteContent.ChunkedContent.ChunkHashes.size());
+ std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(RemoteContent.ChunkedContent.ChunkHashes.size());
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedChunkHashesFound;
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedSequenceHashesFound;
- uint64_t CachedChunkHashesByteCountFound = 0;
- uint64_t CachedSequenceHashesByteCountFound = 0;
{
ZEN_TRACE_CPU("UpdateFolder_CheckChunkCache");
@@ -4399,7 +4669,8 @@ namespace {
if (ChunkSize == CacheDirContent.FileSizes[Index])
{
CachedChunkHashesFound.insert({FileHash, ChunkIndex});
- CachedChunkHashesByteCountFound += ChunkSize;
+ CacheMappingStats.CacheChunkCount++;
+ CacheMappingStats.CacheChunkByteCount += ChunkSize;
continue;
}
}
@@ -4412,7 +4683,8 @@ namespace {
if (SequenceSize == CacheDirContent.FileSizes[Index])
{
CachedSequenceHashesFound.insert({FileHash, SequenceIndex});
- CachedSequenceHashesByteCountFound += SequenceSize;
+ CacheMappingStats.CacheSequenceHashesCount += SequenceSize;
+ CacheMappingStats.CacheSequenceHashesByteCount++;
continue;
}
}
@@ -4422,7 +4694,6 @@ namespace {
}
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedBlocksFound;
- uint64_t CachedBlocksByteCountFound = 0;
{
ZEN_TRACE_CPU("UpdateFolder_CheckBlockCache");
@@ -4457,7 +4728,8 @@ namespace {
if (BlockSize == BlockDirContent.FileSizes[Index])
{
CachedBlocksFound.insert({FileHash, BlockIndex});
- CachedBlocksByteCountFound += BlockSize;
+ CacheMappingStats.CacheBlockCount++;
+ CacheMappingStats.CacheBlocksByteCount += BlockSize;
continue;
}
}
@@ -4467,7 +4739,6 @@ namespace {
}
std::vector<uint32_t> LocalPathIndexesMatchingSequenceIndexes;
- uint64_t LocalPathIndexesByteCountMatchingSequenceIndexes = 0;
// Pick up all whole files we can use from current local state
{
ZEN_TRACE_CPU("UpdateFolder_CheckLocalChunks");
@@ -4496,13 +4767,14 @@ namespace {
const uint32_t LocalPathIndex = GetFirstPathIndexForSeqeuenceIndex(LocalLookup, LocalSequenceIndex);
uint64_t RawSize = LocalContent.RawSizes[LocalPathIndex];
LocalPathIndexesMatchingSequenceIndexes.push_back(LocalPathIndex);
- LocalPathIndexesByteCountMatchingSequenceIndexes += RawSize;
+ CacheMappingStats.LocalPathsMatchingSequencesCount++;
+ CacheMappingStats.LocalPathsMatchingSequencesByteCount += RawSize;
}
else
{
// We must write the sequence
- SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] =
- RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex];
+ const uint32_t ChunkCount = RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex];
+ SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount;
}
}
}
@@ -4514,7 +4786,7 @@ namespace {
struct ChunkTarget
{
uint32_t TargetChunkLocationCount = (uint32_t)-1;
- uint64_t ChunkRawSize = (uint64_t)-1;
+ uint32_t RemoteChunkIndex = (uint32_t)-1;
uint64_t CacheFileOffset = (uint64_t)-1;
};
std::vector<ChunkTarget> ChunkTargets;
@@ -4522,8 +4794,6 @@ namespace {
tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCacheCopyDataIndex;
std::vector<CacheCopyData> CacheCopyDatas;
- uint64_t LocalChunkHashesMatchingRemoteCount = 0;
- uint64_t LocalChunkHashesMatchingRemoteByteCount = 0;
{
ZEN_TRACE_CPU("UpdateFolder_GetLocalChunks");
@@ -4556,7 +4826,7 @@ namespace {
{
CacheCopyData::ChunkTarget Target = {
.TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()),
- .ChunkRawSize = LocalChunkRawSize,
+ .RemoteChunkIndex = RemoteChunkIndex,
.CacheFileOffset = SourceOffset};
if (auto CopySourceIt = RawHashToCacheCopyDataIndex.find(LocalSequenceRawHash);
CopySourceIt != RawHashToCacheCopyDataIndex.end())
@@ -4586,8 +4856,8 @@ namespace {
.TargetChunkLocationPtrs = ChunkTargetPtrs,
.ChunkTargets = std::vector<CacheCopyData::ChunkTarget>{Target}});
}
- LocalChunkHashesMatchingRemoteByteCount += LocalChunkRawSize;
- LocalChunkHashesMatchingRemoteCount++;
+ CacheMappingStats.LocalChunkMatchingRemoteCount++;
+ CacheMappingStats.LocalChunkMatchingRemoteByteCount += LocalChunkRawSize;
RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true;
}
}
@@ -4599,20 +4869,20 @@ namespace {
}
if (!CachedSequenceHashesFound.empty() || !CachedChunkHashesFound.empty() || !CachedBlocksFound.empty() ||
- !LocalPathIndexesMatchingSequenceIndexes.empty() || LocalChunkHashesMatchingRemoteCount > 0)
+ !LocalPathIndexesMatchingSequenceIndexes.empty() || CacheMappingStats.LocalChunkMatchingRemoteCount > 0)
{
ZEN_CONSOLE(
"Cache: {} ({}) chunk sequences, {} ({}) chunks, {} ({}) blocks. Local state: {} ({}) chunk sequences, {} ({}) chunks",
CachedSequenceHashesFound.size(),
- NiceBytes(CachedSequenceHashesByteCountFound),
+ NiceBytes(CacheMappingStats.CacheSequenceHashesByteCount),
CachedChunkHashesFound.size(),
- NiceBytes(CachedChunkHashesByteCountFound),
+ NiceBytes(CacheMappingStats.CacheChunkByteCount),
CachedBlocksFound.size(),
- NiceBytes(CachedBlocksByteCountFound),
+ NiceBytes(CacheMappingStats.CacheBlocksByteCount),
LocalPathIndexesMatchingSequenceIndexes.size(),
- NiceBytes(LocalPathIndexesByteCountMatchingSequenceIndexes),
- LocalChunkHashesMatchingRemoteCount,
- NiceBytes(LocalChunkHashesMatchingRemoteByteCount));
+ NiceBytes(CacheMappingStats.LocalPathsMatchingSequencesByteCount),
+ CacheMappingStats.LocalChunkMatchingRemoteCount,
+ NiceBytes(CacheMappingStats.LocalChunkMatchingRemoteByteCount));
}
uint32_t ChunkCountToWrite = 0;
@@ -4635,9 +4905,7 @@ namespace {
}
uint64_t TotalRequestCount = 0;
- std::atomic<uint64_t> RequestsComplete = 0;
- std::atomic<uint32_t> ChunkCountWritten = 0;
- std::atomic<uint64_t> TotalPartWriteCount = 0;
+ uint64_t TotalPartWriteCount = 0;
std::atomic<uint64_t> WritePartsComplete = 0;
{
@@ -4654,8 +4922,6 @@ namespace {
ProgressBar WriteProgressBar(UsePlainProgress);
ParallellWork Work(AbortFlag);
- std::atomic<uint64_t> BytesDownloaded = 0;
-
struct LooseChunkHashWorkData
{
std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs;
@@ -4737,12 +5003,6 @@ namespace {
std::vector<uint32_t> FullBlockWorks;
- size_t BlocksNeededCount = 0;
- uint64_t AllBlocksSize = 0;
- uint64_t AllBlocksFetch = 0;
- uint64_t AllBlocksSlack = 0;
- uint64_t AllBlockRequests = 0;
- uint64_t AllBlockChunksSize = 0;
for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++)
{
const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
@@ -4798,7 +5058,6 @@ namespace {
}
else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
{
- AllBlockChunksSize += ChunkCompressedLength;
if (NextRange.RangeLength == 0)
{
NextRange.RangeStart = CurrentOffset;
@@ -4815,7 +5074,6 @@ namespace {
ZEN_ASSERT(false);
}
}
- AllBlocksSize += CurrentOffset;
if (NextRange.RangeLength > 0)
{
BlockRanges.push_back(NextRange);
@@ -4825,7 +5083,6 @@ namespace {
std::vector<BlockRangeDescriptor> CollapsedBlockRanges;
auto It = BlockRanges.begin();
CollapsedBlockRanges.push_back(*It++);
- uint64_t TotalSlack = 0;
while (It != BlockRanges.end())
{
BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back();
@@ -4836,7 +5093,6 @@ namespace {
LastRange.ChunkBlockIndexCount =
(It->ChunkBlockIndexStart + It->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart;
LastRange.RangeLength = (It->RangeStart + It->RangeLength) - LastRange.RangeStart;
- TotalSlack += Slack;
}
else
{
@@ -4845,17 +5101,6 @@ namespace {
++It;
}
- uint64_t TotalFetch = 0;
- for (const BlockRangeDescriptor& Range : CollapsedBlockRanges)
- {
- TotalFetch += Range.RangeLength;
- }
-
- AllBlocksFetch += TotalFetch;
- AllBlocksSlack += TotalSlack;
- BlocksNeededCount++;
- AllBlockRequests += CollapsedBlockRanges.size();
-
TotalRequestCount += CollapsedBlockRanges.size();
TotalPartWriteCount += CollapsedBlockRanges.size();
@@ -4863,7 +5108,6 @@ namespace {
}
else
{
- BlocksNeededCount++;
TotalRequestCount++;
TotalPartWriteCount++;
@@ -4877,154 +5121,6 @@ namespace {
}
}
- for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++)
- {
- if (AbortFlag)
- {
- break;
- }
-
- Work.ScheduleWork(
- WritePool, // GetSyncWorkerPool(),//
- [&, CopyDataIndex](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_CopyLocal");
-
- FilteredWrittenBytesPerSecond.Start();
- const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex];
- const uint32_t LocalPathIndex = LocalLookup.SequenceIndexFirstPathIndex[CopyData.LocalSequenceIndex];
- const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
- ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty());
-
- uint64_t CacheLocalFileBytesRead = 0;
-
- size_t TargetStart = 0;
- const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets(
- CopyData.TargetChunkLocationPtrs);
-
- struct WriteOp
- {
- const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr;
- uint64_t CacheFileOffset = (uint64_t)-1;
- uint64_t ChunkSize = (uint64_t)-1;
- };
-
- std::vector<WriteOp> WriteOps;
-
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("Sort");
- WriteOps.reserve(AllTargets.size());
- for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets)
- {
- std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange =
- AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount);
- for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange)
- {
- WriteOps.push_back(WriteOp{.Target = Target,
- .CacheFileOffset = ChunkTarget.CacheFileOffset,
- .ChunkSize = ChunkTarget.ChunkRawSize});
- }
- TargetStart += ChunkTarget.TargetChunkLocationCount;
- }
-
- std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) {
- if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
- {
- return true;
- }
- else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
- {
- return false;
- }
- if (Lhs.Target->Offset < Rhs.Target->Offset)
- {
- return true;
- }
- return false;
- });
- }
-
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("Write");
-
- BufferedOpenFile SourceFile(LocalFilePath);
- WriteFileCache OpenFileCache;
- for (const WriteOp& Op : WriteOps)
- {
- if (AbortFlag)
- {
- break;
- }
- const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <=
- RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]);
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0);
- const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex];
- const uint64_t ChunkSize = Op.ChunkSize;
- CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ChunkSize);
-
- ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]);
-
- OpenFileCache.WriteToFile<CompositeBuffer>(
- RemoteSequenceIndex,
- [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
- return GetTempChunkedSequenceFileName(
- CacheFolderPath,
- RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
- },
- ChunkSource,
- Op.Target->Offset,
- RemoteContent.RawSizes[RemotePathIndex]);
- WriteToDiskBytes += ChunkSize;
- CacheLocalFileBytesRead += ChunkSize; // TODO: This should be the sum of unique chunk sizes?
- }
- }
- if (!AbortFlag)
- {
- // Write tracking, updating this must be done without any files open (WriteFileCache)
- for (const WriteOp& Op : WriteOps)
- {
- const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
- if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
- {
- const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- {
- ZEN_TRACE_CPU("VerifyHash");
- const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
- GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
- if (VerifyChunkHash != SequenceRawHash)
- {
- throw std::runtime_error(
- fmt::format("Written chunk sequence {} hash does not match expected hash {}",
- VerifyChunkHash,
- SequenceRawHash));
- }
- }
-
- ZEN_TRACE_CPU("rename");
- ZEN_ASSERT_SLOW(
- !std::filesystem::exists(GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
- std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
- GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
- }
- }
-
- ChunkCountWritten += gsl::narrow<uint32_t>(CopyData.ChunkTargets.size());
- ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]);
- }
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
- }
- },
- Work.DefaultErrorFunction());
- }
-
for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++)
{
if (AbortFlag)
@@ -5057,9 +5153,8 @@ namespace {
uint64_t RawSize;
if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, RawHash, RawSize))
{
- LooseChunksBytes += ExistingCompressedPart.GetSize();
- RequestsComplete++;
- if (RequestsComplete == TotalRequestCount)
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
@@ -5082,8 +5177,10 @@ namespace {
&RemoteLookup,
&CacheFolderPath,
&SequenceIndexChunksLeftToWriteCounters,
- &WriteToDiskBytes,
- &ChunkCountWritten,
+ &Work,
+ &WritePool,
+ &DiskStats,
+ &WriteChunkStats,
&WritePartsComplete,
&TotalPartWriteCount,
&FilteredWrittenBytesPerSecond,
@@ -5113,12 +5210,15 @@ namespace {
ChunkHash,
ChunkTargetPtrs,
std::move(CompressedPart),
- WriteToDiskBytes);
+ DiskStats,
+ WriteChunkStats);
+ WriteChunkStats.ChunkCountWritten++;
+ WriteChunkStats.ChunkBytesWritten +=
+ RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex];
+ WritePartsComplete++;
if (!AbortFlag)
{
- ChunkCountWritten++;
- WritePartsComplete++;
if (WritePartsComplete == TotalPartWriteCount)
{
FilteredWrittenBytesPerSecond.Stop();
@@ -5126,12 +5226,20 @@ namespace {
std::filesystem::remove(CompressedChunkPath);
- CompleteChunkTargets(TargetFolder,
- RemoteContent,
- ChunkHash,
- ChunkTargetPtrs,
- SequenceIndexChunksLeftToWriteCounters,
- NeedHashVerify);
+ std::vector<uint32_t> CompletedSequences =
+ CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
+ if (NeedHashVerify)
+ {
+ VerifyAndCompleteChunkSequencesAsync(TargetFolder,
+ RemoteContent,
+ CompletedSequences,
+ Work,
+ WritePool);
+ }
+ else
+ {
+ FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences);
+ }
}
}
},
@@ -5151,11 +5259,10 @@ namespace {
PreferredMultipartChunkSize,
Work,
NetworkPool,
- BytesDownloaded,
- MultipartAttachmentCount,
+ DownloadStats,
[&, RemoteChunkIndex, ChunkTargetPtrs](IoBuffer&& Payload) mutable {
- RequestsComplete++;
- if (RequestsComplete == TotalRequestCount)
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
@@ -5168,12 +5275,11 @@ namespace {
WritePool,
std::move(Payload),
SequenceIndexChunksLeftToWriteCounters,
- WriteToDiskBytes,
- ChunkCountWritten,
WritePartsComplete,
TotalPartWriteCount,
- LooseChunksBytes,
- FilteredWrittenBytesPerSecond);
+ FilteredWrittenBytesPerSecond,
+ DiskStats,
+ WriteChunkStats);
});
}
else
@@ -5186,10 +5292,10 @@ namespace {
throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
}
uint64_t BlobSize = BuildBlob.GetSize();
- BytesDownloaded += BlobSize;
-
- RequestsComplete++;
- if (RequestsComplete == TotalRequestCount)
+ DownloadStats.DownloadedChunkCount++;
+ DownloadStats.DownloadedChunkByteCount += BlobSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
@@ -5202,12 +5308,11 @@ namespace {
WritePool,
std::move(BuildBlob),
SequenceIndexChunksLeftToWriteCounters,
- WriteToDiskBytes,
- ChunkCountWritten,
WritePartsComplete,
TotalPartWriteCount,
- LooseChunksBytes,
- FilteredWrittenBytesPerSecond);
+ FilteredWrittenBytesPerSecond,
+ DiskStats,
+ WriteChunkStats);
}
}
}
@@ -5215,6 +5320,186 @@ namespace {
Work.DefaultErrorFunction());
}
+ for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++)
+ {
+ if (AbortFlag)
+ {
+ break;
+ }
+
+ Work.ScheduleWork(
+ WritePool, // GetSyncWorkerPool(),//
+ [&, CopyDataIndex](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_CopyLocal");
+
+ FilteredWrittenBytesPerSecond.Start();
+ const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex];
+ const uint32_t LocalPathIndex = LocalLookup.SequenceIndexFirstPathIndex[CopyData.LocalSequenceIndex];
+ const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
+ ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty());
+
+ uint64_t CacheLocalFileBytesRead = 0;
+
+ size_t TargetStart = 0;
+ const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets(
+ CopyData.TargetChunkLocationPtrs);
+
+ struct WriteOp
+ {
+ const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr;
+ uint64_t CacheFileOffset = (uint64_t)-1;
+ uint32_t ChunkIndex = (uint32_t)-1;
+ };
+
+ std::vector<WriteOp> WriteOps;
+
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("Sort");
+ WriteOps.reserve(AllTargets.size());
+ for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets)
+ {
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange =
+ AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount);
+ for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange)
+ {
+ WriteOps.push_back(WriteOp{.Target = Target,
+ .CacheFileOffset = ChunkTarget.CacheFileOffset,
+ .ChunkIndex = ChunkTarget.RemoteChunkIndex});
+ }
+ TargetStart += ChunkTarget.TargetChunkLocationCount;
+ }
+
+ std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) {
+ if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
+ {
+ return true;
+ }
+ else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
+ {
+ return false;
+ }
+ if (Lhs.Target->Offset < Rhs.Target->Offset)
+ {
+ return true;
+ }
+ return false;
+ });
+ }
+
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("Write");
+
+ tsl::robin_set<uint32_t> ChunkIndexesWritten;
+
+ BufferedOpenFile SourceFile(LocalFilePath, DiskStats);
+ WriteFileCache OpenFileCache(DiskStats);
+ for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();)
+ {
+ if (AbortFlag)
+ {
+ break;
+ }
+ const WriteOp& Op = WriteOps[WriteOpIndex];
+
+ const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <=
+ RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]);
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0);
+ const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex];
+ const uint64_t ChunkSize = RemoteContent.ChunkedContent.ChunkRawSizes[Op.ChunkIndex];
+
+ uint64_t ReadLength = ChunkSize;
+ size_t WriteCount = 1;
+ uint64_t OpSourceEnd = Op.CacheFileOffset + ChunkSize;
+ uint64_t OpTargetEnd = Op.Target->Offset + ChunkSize;
+ while ((WriteOpIndex + WriteCount) < WriteOps.size())
+ {
+ const WriteOp& NextOp = WriteOps[WriteOpIndex + WriteCount];
+ if (NextOp.Target->SequenceIndex != Op.Target->SequenceIndex)
+ {
+ break;
+ }
+ if (NextOp.Target->Offset != OpTargetEnd)
+ {
+ break;
+ }
+ if (NextOp.CacheFileOffset != OpSourceEnd)
+ {
+ break;
+ }
+ const uint64_t NextChunkLength = RemoteContent.ChunkedContent.ChunkRawSizes[NextOp.ChunkIndex];
+ if (ReadLength + NextChunkLength > 512u * 1024u)
+ {
+ break;
+ }
+ ReadLength += NextChunkLength;
+ OpSourceEnd += NextChunkLength;
+ OpTargetEnd += NextChunkLength;
+ WriteCount++;
+ }
+
+ CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ReadLength);
+ ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]);
+
+ OpenFileCache.WriteToFile<CompositeBuffer>(
+ RemoteSequenceIndex,
+ [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
+ return GetTempChunkedSequenceFileName(
+ CacheFolderPath,
+ RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
+ },
+ ChunkSource,
+ Op.Target->Offset,
+ RemoteContent.RawSizes[RemotePathIndex]);
+ for (size_t WrittenOpIndex = WriteOpIndex; WrittenOpIndex < WriteOpIndex + WriteCount; WrittenOpIndex++)
+ {
+ const WriteOp& WrittenOp = WriteOps[WrittenOpIndex];
+ if (ChunkIndexesWritten.insert(WrittenOp.ChunkIndex).second)
+ {
+ WriteChunkStats.ChunkCountWritten++;
+ WriteChunkStats.ChunkBytesWritten +=
+ RemoteContent.ChunkedContent.ChunkRawSizes[WrittenOp.ChunkIndex];
+ }
+ }
+
+ CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes?
+
+ WriteOpIndex += WriteCount;
+ }
+ }
+ if (!AbortFlag)
+ {
+ // Write tracking, updating this must be done without any files open (WriteFileCache)
+ std::vector<uint32_t> CompletedChunkSequences;
+ for (const WriteOp& Op : WriteOps)
+ {
+ const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
+ if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
+ {
+ CompletedChunkSequences.push_back(RemoteSequenceIndex);
+ }
+ }
+ VerifyAndCompleteChunkSequencesAsync(CacheFolderPath,
+ RemoteContent,
+ CompletedChunkSequences,
+ Work,
+ WritePool);
+ ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]);
+ }
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+
for (uint32_t BlockIndex : CachedChunkBlockIndexes)
{
if (AbortFlag)
@@ -5244,11 +5529,13 @@ namespace {
RemoteContent,
BlockDescription,
SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
CompositeBuffer(std::move(BlockBuffer)),
RemoteLookup,
RemoteChunkIndexNeedsCopyFromSourceFlags,
- ChunkCountWritten,
- WriteToDiskBytes))
+ DiskStats,
+ WriteChunkStats))
{
std::error_code DummyEc;
std::filesystem::remove(BlockChunkPath, DummyEc);
@@ -5291,11 +5578,10 @@ namespace {
throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
}
uint64_t BlockSize = BlockBuffer.GetSize();
- BytesDownloaded += BlockSize;
- BlockBytes += BlockSize;
- DownloadedBlocks++;
- RequestsComplete++;
- if (RequestsComplete == TotalRequestCount)
+ DownloadStats.DownloadedBlockCount++;
+ DownloadStats.DownloadedBlockByteCount += BlockSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
@@ -5381,26 +5667,28 @@ namespace {
RemoteContent,
BlockDescription,
SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
CompositeBuffer(std::move(BlockPartialBuffer)),
BlockRange.ChunkBlockIndexStart,
BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
RemoteLookup,
RemoteChunkIndexNeedsCopyFromSourceFlags,
- ChunkCountWritten,
- WriteToDiskBytes))
+ DiskStats,
+ WriteChunkStats))
{
std::error_code DummyEc;
std::filesystem::remove(BlockChunkPath, DummyEc);
throw std::runtime_error(
fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
}
- WritePartsComplete++;
if (!BlockChunkPath.empty())
{
std::filesystem::remove(BlockChunkPath);
}
+ WritePartsComplete++;
if (WritePartsComplete == TotalPartWriteCount)
{
FilteredWrittenBytesPerSecond.Stop();
@@ -5436,11 +5724,10 @@ namespace {
throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
}
uint64_t BlockSize = BlockBuffer.GetSize();
- BytesDownloaded += BlockSize;
- BlockBytes += BlockSize;
- DownloadedBlocks++;
- RequestsComplete++;
- if (RequestsComplete == TotalRequestCount)
+ DownloadStats.DownloadedBlockCount++;
+ DownloadStats.DownloadedBlockByteCount += BlockSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
@@ -5488,15 +5775,17 @@ namespace {
{
Work.ScheduleWork(
WritePool, // WritePool, GetSyncWorkerPool()
- [&RemoteContent,
+ [&Work,
+ &WritePool,
+ &RemoteContent,
&RemoteLookup,
CacheFolderPath,
&RemoteChunkIndexNeedsCopyFromSourceFlags,
&SequenceIndexChunksLeftToWriteCounters,
BlockIndex,
&BlockDescriptions,
- &ChunkCountWritten,
- &WriteToDiskBytes,
+ &WriteChunkStats,
+ &DiskStats,
&WritePartsComplete,
&TotalPartWriteCount,
&FilteredWrittenBytesPerSecond,
@@ -5529,23 +5818,26 @@ namespace {
RemoteContent,
BlockDescription,
SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
CompositeBuffer(std::move(BlockBuffer)),
RemoteLookup,
RemoteChunkIndexNeedsCopyFromSourceFlags,
- ChunkCountWritten,
- WriteToDiskBytes))
+ DiskStats,
+ WriteChunkStats))
{
std::error_code DummyEc;
std::filesystem::remove(BlockChunkPath, DummyEc);
throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
}
- WritePartsComplete++;
if (!BlockChunkPath.empty())
{
std::filesystem::remove(BlockChunkPath);
}
+ WritePartsComplete++;
+
if (WritePartsComplete == TotalPartWriteCount)
{
FilteredWrittenBytesPerSecond.Stop();
@@ -5559,35 +5851,32 @@ namespace {
Work.DefaultErrorFunction());
}
- ZEN_DEBUG("Fetching {} with {} slack (ideal {}) out of {} using {} requests for {} blocks",
- NiceBytes(AllBlocksFetch),
- NiceBytes(AllBlocksSlack),
- NiceBytes(AllBlockChunksSize),
- NiceBytes(AllBlocksSize),
- AllBlockRequests,
- BlocksNeededCount);
{
ZEN_TRACE_CPU("WriteChunks_Wait");
Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(IsAborted, PendingWork);
- ZEN_ASSERT(ChunkCountToWrite >= ChunkCountWritten.load());
- FilteredWrittenBytesPerSecond.Update(WriteToDiskBytes.load());
- FilteredDownloadedBytesPerSecond.Update(BytesDownloaded.load());
+ ZEN_ASSERT(ChunkCountToWrite >= WriteChunkStats.ChunkCountWritten.load());
+ uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() +
+ DownloadStats.DownloadedBlockByteCount.load() +
+ +DownloadStats.DownloadedPartialBlockByteCount.load();
+ FilteredWrittenBytesPerSecond.Update(DiskStats.WriteByteCount.load());
+ FilteredDownloadedBytesPerSecond.Update(DownloadedBytes);
std::string Details = fmt::format("{}/{} ({} {}bits/s) downloaded. {}/{} ({} {}B/s) written.",
- RequestsComplete.load(),
+ DownloadStats.RequestsCompleteCount.load(),
TotalRequestCount,
- NiceBytes(BytesDownloaded.load()),
+ NiceBytes(DownloadedBytes),
NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8),
- ChunkCountWritten.load(),
+ WriteChunkStats.ChunkCountWritten.load(),
ChunkCountToWrite,
- NiceBytes(WriteToDiskBytes.load()),
+ NiceBytes(DiskStats.WriteByteCount.load()),
NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()));
- WriteProgressBar.UpdateState({.Task = "Writing chunks ",
- .Details = Details,
- .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite),
- .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - ChunkCountWritten.load())},
- false);
+ WriteProgressBar.UpdateState(
+ {.Task = "Writing chunks ",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite),
+ .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - WriteChunkStats.ChunkCountWritten.load())},
+ false);
});
}
@@ -5617,25 +5906,21 @@ namespace {
}
ZEN_ASSERT(RawSequencesMissingWriteCount == 0);
+ const uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() + DownloadStats.DownloadedBlockByteCount.load() +
+ +DownloadStats.DownloadedPartialBlockByteCount.load();
ZEN_CONSOLE("Downloaded {} ({}bits/s) in {}. Wrote {} ({}B/s) in {}. Completed in {}",
- NiceBytes(BytesDownloaded.load()),
- NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), BytesDownloaded * 8)),
+ NiceBytes(DownloadedBytes),
+ NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), DownloadedBytes * 8)),
NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000),
- NiceBytes(WriteToDiskBytes.load()),
- NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), WriteToDiskBytes.load())),
+ NiceBytes(DiskStats.WriteByteCount.load()),
+ NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), DiskStats.WriteByteCount.load())),
NiceTimeSpanMs(FilteredWrittenBytesPerSecond.GetElapsedTimeUS() / 1000),
NiceTimeSpanMs(WriteTimer.GetElapsedTimeMs()));
- }
- std::vector<std::pair<IoHash, uint32_t>> Targets;
- Targets.reserve(RemoteContent.Paths.size());
- for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++)
- {
- Targets.push_back(std::make_pair(RemoteContent.RawHashes[RemotePathIndex], RemotePathIndex));
+ WriteChunkStats.WriteChunksElapsedWallTimeUs = WriteTimer.GetElapsedTimeUs();
+ WriteChunkStats.DownloadTimeUs = FilteredDownloadedBytesPerSecond.GetElapsedTimeUS();
+ WriteChunkStats.WriteTimeUs = FilteredWrittenBytesPerSecond.GetElapsedTimeUS();
}
- std::sort(Targets.begin(), Targets.end(), [](const std::pair<IoHash, uint32_t>& Lhs, const std::pair<IoHash, uint32_t>& Rhs) {
- return Lhs.first < Rhs.first;
- });
// Move all files we will reuse to cache folder
// TODO: If WipeTargetFolder is false we could check which files are already correct and leave them in place
@@ -5662,6 +5947,7 @@ namespace {
if (WipeTargetFolder)
{
ZEN_TRACE_CPU("UpdateFolder_WipeTarget");
+ Stopwatch Timer;
// Clean target folder
ZEN_CONSOLE("Wiping {}", Path);
@@ -5669,10 +5955,12 @@ namespace {
{
ZEN_WARN("Some files in {} could not be removed", Path);
}
+ RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs();
}
else
{
ZEN_TRACE_CPU("UpdateFolder_RemoveUnused");
+ Stopwatch Timer;
// Remove unused tracked files
tsl::robin_map<std::string, uint32_t> RemotePathToRemoteIndex;
@@ -5702,10 +5990,12 @@ namespace {
std::filesystem::remove(LocalFilePath);
}
}
+ RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs();
}
{
ZEN_TRACE_CPU("UpdateFolder_FinalizeTree");
+ Stopwatch Timer;
WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
@@ -5719,6 +6009,16 @@ namespace {
std::atomic<uint64_t> TargetsComplete = 0;
+ std::vector<std::pair<IoHash, uint32_t>> Targets;
+ Targets.reserve(RemoteContent.Paths.size());
+ for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++)
+ {
+ Targets.push_back(std::make_pair(RemoteContent.RawHashes[RemotePathIndex], RemotePathIndex));
+ }
+ std::sort(Targets.begin(), Targets.end(), [](const std::pair<IoHash, uint32_t>& Lhs, const std::pair<IoHash, uint32_t>& Rhs) {
+ return Lhs.first < Rhs.first;
+ });
+
size_t TargetOffset = 0;
while (TargetOffset < Targets.size())
{
@@ -5772,6 +6072,7 @@ namespace {
SetFileReadOnly(FirstTargetFilePath, false);
}
std::filesystem::rename(CacheFilePath, FirstTargetFilePath);
+ RebuildFolderStateStats.FinalizeTreeFilesMovedCount++;
}
OutLocalFolderState.Attributes[FirstTargetPathIndex] =
@@ -5800,6 +6101,7 @@ namespace {
SetFileReadOnly(ExtraTargetFilePath, false);
}
CopyFile(FirstTargetFilePath, ExtraTargetFilePath, {.EnableClone = false});
+ RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++;
OutLocalFolderState.Attributes[ExtraTargetPathIndex] =
RemoteContent.Attributes.empty()
@@ -5834,6 +6136,8 @@ namespace {
});
}
+ RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs = Timer.GetElapsedTimeUs();
+
if (AbortFlag)
{
return;
@@ -6493,13 +6797,21 @@ namespace {
}
else
{
- ExtendableStringBuilder<128> SB;
+ ExtendableStringBuilder<128> BuildPartString;
for (const std::pair<Oid, std::string>& BuildPart : AllBuildParts)
{
- SB.Append(fmt::format(" {} ({})", BuildPart.second, BuildPart.first));
+ BuildPartString.Append(fmt::format(" {} ({})", BuildPart.second, BuildPart.first));
}
- ZEN_CONSOLE("Downloading build {}, parts:{}", BuildId, SB.ToView());
+ ZEN_CONSOLE("Downloading build {}, parts:{}", BuildId, BuildPartString.ToView());
FolderContent LocalFolderState;
+
+ DiskStatistics DiskStats;
+ CacheMappingStatistics CacheMappingStats;
+ DownloadStatistics DownloadStats;
+ WriteChunkStatistics WriteChunkStats;
+ RebuildFolderStateStatistics RebuildFolderStateStats;
+ VerifyFolderStatistics VerifyFolderStats;
+
UpdateFolder(Storage,
BuildId,
Path,
@@ -6511,11 +6823,16 @@ namespace {
LooseChunkHashes,
AllowPartialBlockRequests,
WipeTargetFolder,
- LocalFolderState);
+ LocalFolderState,
+ DiskStats,
+ CacheMappingStats,
+ DownloadStats,
+ WriteChunkStats,
+ RebuildFolderStateStats);
if (!AbortFlag)
{
- VerifyFolder(RemoteContent, Path, PostDownloadVerify);
+ VerifyFolder(RemoteContent, Path, PostDownloadVerify, VerifyFolderStats);
Stopwatch WriteStateTimer;
CbObject StateObject = CreateStateObject(BuildId, AllBuildParts, PartContents, LocalFolderState);
@@ -6529,8 +6846,37 @@ namespace {
CompactBinaryToJson(StateObject, SB);
WriteFile(Path / ZenStateFileJsonPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size()));
#endif // 0
+ const uint64_t DownloadCount = DownloadStats.DownloadedChunkCount.load() + DownloadStats.DownloadedBlockCount.load() +
+ DownloadStats.DownloadedPartialBlockCount.load();
+ const uint64_t DownloadByteCount = DownloadStats.DownloadedChunkByteCount.load() +
+ DownloadStats.DownloadedBlockByteCount.load() +
+ DownloadStats.DownloadedPartialBlockByteCount.load();
+ const uint64_t DownloadTimeMs = DownloadTimer.GetElapsedTimeMs();
- ZEN_CONSOLE("Downloaded build in {}.", NiceTimeSpanMs(DownloadTimer.GetElapsedTimeMs()));
+ ZEN_CONSOLE(
+ "Downloaded build {}, parts:{} in {}\n"
+ " Download: {} ({}) {}bits/s\n"
+ " Write: {} ({}) {}B/s\n"
+ " Clean: {}\n"
+ " Finalize: {}\n"
+ " Verify: {}",
+ BuildId,
+ BuildPartString.ToView(),
+ NiceTimeSpanMs(DownloadTimeMs),
+
+ DownloadCount,
+ NiceBytes(DownloadByteCount),
+ NiceNum(GetBytesPerSecond(WriteChunkStats.DownloadTimeUs, DownloadByteCount * 8)),
+
+ DiskStats.WriteCount.load(),
+ NiceBytes(DiskStats.WriteByteCount.load()),
+ NiceNum(GetBytesPerSecond(WriteChunkStats.WriteTimeUs, DiskStats.WriteByteCount.load())),
+
+ NiceTimeSpanMs(RebuildFolderStateStats.CleanFolderElapsedWallTimeUs / 1000),
+
+ NiceTimeSpanMs(RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs / 1000),
+
+ NiceTimeSpanMs(VerifyFolderStats.VerifyElapsedWallTimeUs / 1000));
}
}
if (CleanDirectory(ZenTempFolder, {}))
@@ -6708,7 +7054,7 @@ BuildsCommand::BuildsCommand()
m_Options.add_options()("h,help", "Print help");
auto AddAuthOptions = [this](cxxopts::Options& Ops) {
- Ops.add_option("", "", "system-dir", "Specify system root", cxxopts::value<std::filesystem::path>(m_SystemRootDir), "<systemdir>");
+ Ops.add_option("", "", "system-dir", "Specify system root", cxxopts::value<std::string>(m_SystemRootDir), "<systemdir>");
// Direct access token (may expire)
Ops.add_option("auth-token",
@@ -7050,7 +7396,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
auto CreateAuthMgr = [&]() {
if (!Auth)
{
- std::filesystem::path DataRoot = m_SystemRootDir.empty() ? PickDefaultSystemRootDirectory() : m_SystemRootDir;
+ std::filesystem::path DataRoot = m_SystemRootDir.empty() ? PickDefaultSystemRootDirectory() : StringToPath(m_SystemRootDir);
if (m_EncryptionKey.empty())
{
@@ -7172,8 +7518,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
else if (!m_StoragePath.empty())
{
- ZEN_CONSOLE("Querying builds in folder '{}'.", m_StoragePath);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
+ std::filesystem::path StoragePath = StringToPath(m_StoragePath);
+ ZEN_CONSOLE("Querying builds in folder '{}'.", StoragePath);
+ Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
}
else
{
@@ -7224,9 +7571,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
}
+ std::filesystem::path Path = StringToPath(m_Path);
+
if (m_BuildPartName.empty())
{
- m_BuildPartName = m_Path.filename().string();
+ m_BuildPartName = Path.filename().string();
}
const bool GeneratedBuildId = m_BuildId.empty();
@@ -7266,26 +7615,27 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
ZEN_CONSOLE("Uploading '{}' from '{}' to cloud endpoint '{}'. SessionId: '{}'. Namespace '{}', Bucket '{}', {}BuildId '{}'",
m_BuildPartName,
- m_Path,
+ Path,
m_BuildsUrl,
Http.GetSessionId(),
m_Namespace,
m_Bucket,
GeneratedBuildId ? "Generated " : "",
BuildId);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
+ Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
StorageName = "Cloud DDC";
}
else if (!m_StoragePath.empty())
{
+ std::filesystem::path StoragePath = StringToPath(m_StoragePath);
ZEN_CONSOLE("Uploading '{}' from '{}' to folder '{}'. {}BuildId '{}'",
m_BuildPartName,
- m_Path,
- m_StoragePath,
+ Path,
+ StoragePath,
GeneratedBuildId ? "Generated " : "",
BuildId);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, m_WriteMetadataAsJson, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", m_StoragePath.stem());
+ Storage = CreateFileBuildStorage(StoragePath, StorageStats, m_WriteMetadataAsJson, DefaultLatency, DefaultDelayPerKBSec);
+ StorageName = fmt::format("Disk {}", StoragePath.stem());
}
else
{
@@ -7328,7 +7678,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildId,
BuildPartId,
m_BuildPartName,
- m_Path,
+ Path,
m_ManifestPath,
m_BlockReuseMinPercentLimit,
m_AllowMultiparts,
@@ -7397,6 +7747,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
}
+ std::filesystem::path Path = StringToPath(m_Path);
+
BuildStorage::Statistics StorageStats;
std::unique_ptr<BuildStorage> Storage;
std::string StorageName;
@@ -7404,19 +7756,20 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
ZEN_CONSOLE("Downloading Build '{}' to '{}' from {}. SessionId: '{}'. Namespace '{}', Bucket '{}'",
BuildId,
- m_Path,
+ Path,
m_BuildsUrl,
Http.GetSessionId(),
m_Namespace,
m_Bucket);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
+ Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
StorageName = ZEN_CLOUD_STORAGE;
}
else if (!m_StoragePath.empty())
{
- ZEN_CONSOLE("Downloading Build '{}' to '{}' from folder {}", BuildId, m_Path, m_StoragePath);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", m_StoragePath.stem());
+ std::filesystem::path StoragePath = StringToPath(m_StoragePath);
+ ZEN_CONSOLE("Downloading Build '{}' to '{}' from folder {}", BuildId, Path, StoragePath);
+ Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
+ StorageName = fmt::format("Disk {}", StoragePath.stem());
}
else
{
@@ -7427,7 +7780,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildId,
BuildPartIds,
m_BuildPartNames,
- m_Path,
+ Path,
m_AllowMultiparts,
m_AllowPartialBlockRequests,
m_Clean,
@@ -7466,7 +7819,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
throw zen::OptionParseException(fmt::format("compare-path is required\n{}", m_DownloadOptions.help()));
}
- DiffFolders(m_Path, m_DiffPath, m_OnlyChunked);
+ std::filesystem::path Path = StringToPath(m_Path);
+ DiffFolders(Path, m_DiffPath, m_OnlyChunked);
return AbortFlag ? 11 : 0;
}
@@ -7491,6 +7845,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
// "07d3964f919d577a321a1fdd",
// "07d396a6ce875004e16b9528"};
+ std::filesystem::path Path = StringToPath(m_Path);
+
BuildStorage::Statistics StorageStats;
std::unique_ptr<BuildStorage> Storage;
std::string StorageName;
@@ -7498,25 +7854,27 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
ZEN_CONSOLE("Downloading {} to '{}' from {}. SessionId: '{}'. Namespace '{}', Bucket '{}'",
FormatArray<std::string>(m_BuildIds, " "sv),
- m_Path,
+ Path,
m_BuildsUrl,
Http.GetSessionId(),
m_Namespace,
m_Bucket);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
+ Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
StorageName = ZEN_CLOUD_STORAGE;
}
else if (!m_StoragePath.empty())
{
- ZEN_CONSOLE("Downloading {}'to '{}' from folder '{}'", FormatArray<std::string>(m_BuildIds, " "sv), m_Path, m_StoragePath);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", m_StoragePath.stem());
+ std::filesystem::path StoragePath = StringToPath(m_StoragePath);
+ ZEN_CONSOLE("Downloading '{}' to '{}' from folder '{}'", FormatArray<std::string>(m_BuildIds, " "sv), Path, StoragePath);
+ Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
+ StorageName = fmt::format("Disk {}", StoragePath.stem());
}
else
{
throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
}
+ Stopwatch Timer;
for (const std::string& BuildIdString : m_BuildIds)
{
Oid BuildId = Oid::FromHexString(BuildIdString);
@@ -7528,7 +7886,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildId,
{},
{},
- m_Path,
+ Path,
m_AllowMultiparts,
m_AllowPartialBlockRequests,
BuildIdString == m_BuildIds.front(),
@@ -7540,6 +7898,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\n");
}
+ ZEN_CONSOLE("Completed in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
return 0;
}
@@ -7555,8 +7914,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help()));
}
+ std::filesystem::path Path = StringToPath(m_Path);
+
m_BuildId = Oid::NewOid().ToString();
- m_BuildPartName = m_Path.filename().string();
+ m_BuildPartName = Path.filename().string();
m_BuildPartId = Oid::NewOid().ToString();
m_CreateBuild = true;
@@ -7566,16 +7927,18 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
std::unique_ptr<BuildStorage> Storage;
std::string StorageName;
- if (m_BuildsUrl.empty() && m_StoragePath.empty())
+ std::filesystem::path StoragePath = StringToPath(m_StoragePath);
+
+ if (m_BuildsUrl.empty() && StoragePath.empty())
{
- m_StoragePath = GetRunningExecutablePath().parent_path() / ".tmpstore";
- CreateDirectories(m_StoragePath);
- CleanDirectory(m_StoragePath, {});
+ m_StoragePath = (GetRunningExecutablePath().parent_path() / ".tmpstore").generic_string();
+ CreateDirectories(StoragePath);
+ CleanDirectory(StoragePath, {});
}
auto _ = MakeGuard([&]() {
- if (m_BuildsUrl.empty() && m_StoragePath.empty())
+ if (m_BuildsUrl.empty() && StoragePath.empty())
{
- DeleteDirectories(m_StoragePath);
+ DeleteDirectories(StoragePath);
}
});
@@ -7583,24 +7946,24 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
ZEN_CONSOLE("Using '{}' to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'",
m_BuildPartName.empty() ? m_BuildPartId : m_BuildPartName,
- m_Path,
+ Path,
m_BuildsUrl,
Http.GetSessionId(),
m_Namespace,
m_Bucket,
BuildId);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
+ Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
StorageName = ZEN_CLOUD_STORAGE;
}
- else if (!m_StoragePath.empty())
+ else if (!StoragePath.empty())
{
ZEN_CONSOLE("Using '{}' to '{}' from folder {}. BuildId '{}'",
m_BuildPartName.empty() ? m_BuildPartId : m_BuildPartName,
- m_Path,
- m_StoragePath,
+ Path,
+ StoragePath,
BuildId);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", m_StoragePath.stem());
+ Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
+ StorageName = fmt::format("Disk {}", StoragePath.stem());
}
else
{
@@ -7632,7 +7995,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildId,
BuildPartId,
m_BuildPartName,
- m_Path,
+ Path,
{},
m_BlockReuseMinPercentLimit,
m_AllowMultiparts,
@@ -7646,7 +8009,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
return 11;
}
- const std::filesystem::path DownloadPath = m_Path.parent_path() / (m_BuildPartName + "_download");
+ const std::filesystem::path DownloadPath = Path.parent_path() / (m_BuildPartName + "_download");
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, true, true);
if (AbortFlag)
@@ -7708,27 +8071,29 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
case 0:
{
uint64_t SourceSize = *FileSizeIt;
- if (SourceSize > 0)
+ if (SourceSize > 256)
{
Work.ScheduleWork(
GetMediumWorkerPool(EWorkloadType::Burst),
[SourceSize, FilePath](std::atomic<bool>&) {
if (!AbortFlag)
{
- IoBuffer Scrambled(SourceSize);
+ bool IsReadOnly = SetFileReadOnly(FilePath, false);
{
- IoBuffer Source = IoBufferBuilder::MakeFromFile(FilePath);
- Scrambled.GetMutableView().CopyFrom(
- Source.GetView().Mid(SourceSize / 3, SourceSize / 3));
- Scrambled.GetMutableView()
- .Mid(SourceSize / 3)
- .CopyFrom(Source.GetView().Mid(0, SourceSize / 3));
- Scrambled.GetMutableView()
- .Mid((SourceSize / 3) * 2)
- .CopyFrom(Source.GetView().Mid(SourceSize / 2, SourceSize / 3));
+ BasicFile Source(FilePath, BasicFile::Mode::kWrite);
+ uint64_t RangeSize = Min(SourceSize / 3, 512u * 1024u);
+ IoBuffer TempBuffer1(RangeSize);
+ IoBuffer TempBuffer2(RangeSize);
+ IoBuffer TempBuffer3(RangeSize);
+ Source.Read(TempBuffer1.GetMutableView().GetData(), RangeSize, 0);
+ Source.Read(TempBuffer2.GetMutableView().GetData(), RangeSize, SourceSize / 2);
+ Source.Read(TempBuffer3.GetMutableView().GetData(),
+ RangeSize,
+ SourceSize - RangeSize);
+ Source.Write(TempBuffer1, SourceSize / 2);
+ Source.Write(TempBuffer2, SourceSize - RangeSize);
+ Source.Write(TempBuffer3, SourceSize - 0);
}
- bool IsReadOnly = SetFileReadOnly(FilePath, false);
- WriteFile(FilePath, Scrambled);
if (IsReadOnly)
{
SetFileReadOnly(FilePath, true);
@@ -7870,6 +8235,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
std::unique_ptr<BuildStorage> Storage;
std::string StorageName;
+ std::filesystem::path Path = StringToPath(m_Path);
+
if (!m_BuildsUrl.empty())
{
ZEN_CONSOLE("Using from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'",
@@ -7878,14 +8245,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_Namespace,
m_Bucket,
BuildId);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
+ Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
StorageName = ZEN_CLOUD_STORAGE;
}
else if (!m_StoragePath.empty())
{
- ZEN_CONSOLE("Using folder {}. BuildId '{}'", m_StoragePath, BuildId);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", m_StoragePath.stem());
+ std::filesystem::path StoragePath = StringToPath(m_StoragePath);
+ ZEN_CONSOLE("Using folder {}. BuildId '{}'", StoragePath, BuildId);
+ Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
+ StorageName = fmt::format("Disk {}", StoragePath.stem());
}
else
{
@@ -7937,6 +8305,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
std::unique_ptr<BuildStorage> Storage;
std::string StorageName;
+ std::filesystem::path Path = StringToPath(m_Path);
+
if (!m_BuildsUrl.empty())
{
ZEN_CONSOLE("Using from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'",
@@ -7945,14 +8315,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_Namespace,
m_Bucket,
BuildId);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
+ Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
StorageName = ZEN_CLOUD_STORAGE;
}
else if (!m_StoragePath.empty())
{
- ZEN_CONSOLE("Using folder {}. BuildId '{}'", m_StoragePath, BuildId);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", m_StoragePath.stem());
+ std::filesystem::path StoragePath = StringToPath(m_StoragePath);
+ ZEN_CONSOLE("Using folder {}. BuildId '{}'", StoragePath, BuildId);
+ Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
+ StorageName = fmt::format("Disk {}", StoragePath.stem());
}
else
{
@@ -7960,11 +8331,21 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
Oid BuildPartId = Oid::TryFromHexString(m_BuildPartId);
- ValidateBuildPart(*Storage, BuildId, BuildPartId, m_BuildPartName);
+ ValidateStatistics ValidateStats;
+ DownloadStatistics DownloadStats;
+ ValidateBuildPart(*Storage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, DownloadStats);
return AbortFlag ? 13 : 0;
}
}
+ catch (const ParallellWorkException& Ex)
+ {
+ for (const std::string& Error : Ex.m_Errors)
+ {
+ ZEN_ERROR("{}", Error);
+ }
+ return 3;
+ }
catch (const std::exception& Ex)
{
ZEN_ERROR("{}", Ex.what());
diff --git a/src/zen/cmds/builds_cmd.h b/src/zen/cmds/builds_cmd.h
index 167a5d29f..60953efad 100644
--- a/src/zen/cmds/builds_cmd.h
+++ b/src/zen/cmds/builds_cmd.h
@@ -25,7 +25,7 @@ public:
private:
cxxopts::Options m_Options{Name, Description};
- std::filesystem::path m_SystemRootDir;
+ std::string m_SystemRootDir;
bool m_PlainProgress = false;
bool m_Verbose = false;
@@ -37,20 +37,20 @@ private:
std::string m_Bucket;
// file storage
- std::filesystem::path m_StoragePath;
- bool m_WriteMetadataAsJson = false;
-
- std::string m_BuildId;
- bool m_CreateBuild = false;
- std::string m_BuildMetadataPath;
- std::string m_BuildMetadata;
- std::string m_BuildPartName; // Defaults to name of leaf folder in m_Path
- std::string m_BuildPartId; // Defaults to a generated id when creating part, looked up when downloading using m_BuildPartName
- bool m_Clean = false;
- uint8_t m_BlockReuseMinPercentLimit = 85;
- bool m_AllowMultiparts = true;
- bool m_AllowPartialBlockRequests = true;
- std::filesystem::path m_ManifestPath;
+ std::string m_StoragePath;
+ bool m_WriteMetadataAsJson = false;
+
+ std::string m_BuildId;
+ bool m_CreateBuild = false;
+ std::string m_BuildMetadataPath;
+ std::string m_BuildMetadata;
+ std::string m_BuildPartName; // Defaults to name of leaf folder in m_Path
+ std::string m_BuildPartId; // Defaults to a generated id when creating part, looked up when downloading using m_BuildPartName
+ bool m_Clean = false;
+ uint8_t m_BlockReuseMinPercentLimit = 85;
+ bool m_AllowMultiparts = true;
+ bool m_AllowPartialBlockRequests = true;
+ std::string m_ManifestPath;
// Direct access token (may expire)
std::string m_AccessToken;
@@ -76,7 +76,7 @@ private:
cxxopts::Options m_ListOptions{"list", "List available builds"};
- std::filesystem::path m_Path;
+ std::string m_Path;
cxxopts::Options m_UploadOptions{"upload", "Upload a folder"};
bool m_PostUploadVerify = false;
@@ -86,9 +86,9 @@ private:
std::vector<std::string> m_BuildPartIds;
bool m_PostDownloadVerify = false;
- cxxopts::Options m_DiffOptions{"diff", "Compare two local folders"};
- std::filesystem::path m_DiffPath;
- bool m_OnlyChunked = false;
+ cxxopts::Options m_DiffOptions{"diff", "Compare two local folders"};
+ std::string m_DiffPath;
+ bool m_OnlyChunked = false;
cxxopts::Options m_TestOptions{"test", "Test upload and download with verify"};
diff --git a/src/zen/cmds/status_cmd.cpp b/src/zen/cmds/status_cmd.cpp
index 16754e747..4d1534e05 100644
--- a/src/zen/cmds/status_cmd.cpp
+++ b/src/zen/cmds/status_cmd.cpp
@@ -32,16 +32,17 @@ StatusCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
uint16_t EffectivePort = 0;
if (!m_DataDir.empty())
{
- if (!std::filesystem::is_regular_file(m_DataDir / ".lock"))
+ std::filesystem::path DataDir = StringToPath(m_DataDir);
+ if (!std::filesystem::is_regular_file(DataDir / ".lock"))
{
- ZEN_CONSOLE("lock file does not exist in directory '{}'", m_DataDir);
+ ZEN_CONSOLE("lock file does not exist in directory '{}'", DataDir);
return 1;
}
- LockFileInfo Info = ReadLockFilePayload(LoadCompactBinaryObject(IoBufferBuilder::MakeFromFile(m_DataDir / ".lock")));
+ LockFileInfo Info = ReadLockFilePayload(LoadCompactBinaryObject(IoBufferBuilder::MakeFromFile(DataDir / ".lock")));
std::string Reason;
if (!ValidateLockFileInfo(Info, Reason))
{
- ZEN_CONSOLE("lock file in directory '{}' is not valid. Reason: '{}'", m_DataDir, Reason);
+ ZEN_CONSOLE("lock file in directory '{}' is not valid. Reason: '{}'", DataDir, Reason);
return 1;
}
EffectivePort = Info.EffectiveListenPort;
diff --git a/src/zen/cmds/status_cmd.h b/src/zen/cmds/status_cmd.h
index 46bda9ee6..00ad0e758 100644
--- a/src/zen/cmds/status_cmd.h
+++ b/src/zen/cmds/status_cmd.h
@@ -20,9 +20,9 @@ public:
private:
int GetLockFileEffectivePort() const;
- cxxopts::Options m_Options{"status", "Show zen status"};
- uint16_t m_Port = 0;
- std::filesystem::path m_DataDir;
+ cxxopts::Options m_Options{"status", "Show zen status"};
+ uint16_t m_Port = 0;
+ std::string m_DataDir;
};
} // namespace zen
diff --git a/src/zen/cmds/up_cmd.cpp b/src/zen/cmds/up_cmd.cpp
index ac2f42a86..44a41146c 100644
--- a/src/zen/cmds/up_cmd.cpp
+++ b/src/zen/cmds/up_cmd.cpp
@@ -77,13 +77,15 @@ UpCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
}
- if (m_ProgramBaseDir.empty())
+ std::filesystem::path ProgramBaseDir = StringToPath(m_ProgramBaseDir);
+
+ if (ProgramBaseDir.empty())
{
std::filesystem::path ExePath = zen::GetRunningExecutablePath();
- m_ProgramBaseDir = ExePath.parent_path();
+ ProgramBaseDir = ExePath.parent_path();
}
ZenServerEnvironment ServerEnvironment;
- ServerEnvironment.Initialize(m_ProgramBaseDir);
+ ServerEnvironment.Initialize(ProgramBaseDir);
ZenServerInstance Server(ServerEnvironment);
std::string ServerArguments = GlobalOptions.PassthroughCommandLine;
if ((m_Port != 0) && (ServerArguments.find("--port"sv) == std::string::npos))
@@ -153,18 +155,20 @@ AttachCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
Instance.Sweep();
ZenServerState::ZenServerEntry* Entry = Instance.Lookup(m_Port);
- if (!m_DataDir.empty())
+ std::filesystem::path DataDir = StringToPath(m_DataDir);
+
+ if (!DataDir.empty())
{
- if (!std::filesystem::is_regular_file(m_DataDir / ".lock"))
+ if (!std::filesystem::is_regular_file(DataDir / ".lock"))
{
- ZEN_CONSOLE("lock file does not exist in directory '{}'", m_DataDir);
+ ZEN_CONSOLE("lock file does not exist in directory '{}'", DataDir);
return 1;
}
- LockFileInfo Info = ReadLockFilePayload(LoadCompactBinaryObject(IoBufferBuilder::MakeFromFile(m_DataDir / ".lock")));
+ LockFileInfo Info = ReadLockFilePayload(LoadCompactBinaryObject(IoBufferBuilder::MakeFromFile(DataDir / ".lock")));
std::string Reason;
if (!ValidateLockFileInfo(Info, Reason))
{
- ZEN_CONSOLE("lock file in directory '{}' is not valid. Reason: '{}'", m_DataDir, Reason);
+ ZEN_CONSOLE("lock file in directory '{}' is not valid. Reason: '{}'", DataDir, Reason);
return 1;
}
Entry = Instance.LookupByEffectivePort(Info.EffectiveListenPort);
@@ -214,24 +218,27 @@ DownCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
Instance.Initialize();
ZenServerState::ZenServerEntry* Entry = Instance.Lookup(m_Port);
- if (m_ProgramBaseDir.empty())
+ std::filesystem::path ProgramBaseDir = StringToPath(m_ProgramBaseDir);
+ if (ProgramBaseDir.empty())
{
- std::filesystem::path ExePath = zen::GetRunningExecutablePath();
- m_ProgramBaseDir = ExePath.parent_path();
+ std::filesystem::path ExePath = GetRunningExecutablePath();
+ ProgramBaseDir = ExePath.parent_path();
}
- if (!m_DataDir.empty())
+ std::filesystem::path DataDir = StringToPath(m_DataDir);
+
+ if (!DataDir.empty())
{
- if (!std::filesystem::is_regular_file(m_DataDir / ".lock"))
+ if (!std::filesystem::is_regular_file(DataDir / ".lock"))
{
- ZEN_CONSOLE("lock file does not exist in directory '{}'", m_DataDir);
+ ZEN_CONSOLE("lock file does not exist in directory '{}'", DataDir);
return 1;
}
- LockFileInfo Info = ReadLockFilePayload(LoadCompactBinaryObject(IoBufferBuilder::MakeFromFile(m_DataDir / ".lock")));
+ LockFileInfo Info = ReadLockFilePayload(LoadCompactBinaryObject(IoBufferBuilder::MakeFromFile(DataDir / ".lock")));
std::string Reason;
if (!ValidateLockFileInfo(Info, Reason))
{
- ZEN_CONSOLE("lock file in directory '{}' is not valid. Reason: '{}'", m_DataDir, Reason);
+ ZEN_CONSOLE("lock file in directory '{}' is not valid. Reason: '{}'", DataDir, Reason);
return 1;
}
Entry = Instance.LookupByEffectivePort(Info.EffectiveListenPort);
@@ -244,7 +251,7 @@ DownCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
try
{
ZenServerEnvironment ServerEnvironment;
- ServerEnvironment.Initialize(m_ProgramBaseDir);
+ ServerEnvironment.Initialize(ProgramBaseDir);
ZenServerInstance Server(ServerEnvironment);
Server.AttachToRunningServer(EntryPort);
@@ -309,7 +316,7 @@ DownCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (m_ForceTerminate)
{
// Try to find the running executable by path name
- std::filesystem::path ServerExePath = m_ProgramBaseDir / "zenserver" ZEN_EXE_SUFFIX_LITERAL;
+ std::filesystem::path ServerExePath = ProgramBaseDir / "zenserver" ZEN_EXE_SUFFIX_LITERAL;
ProcessHandle RunningProcess;
if (std::error_code Ec = FindProcess(ServerExePath, RunningProcess); !Ec)
{
diff --git a/src/zen/cmds/up_cmd.h b/src/zen/cmds/up_cmd.h
index c9af16749..32d8ddab3 100644
--- a/src/zen/cmds/up_cmd.h
+++ b/src/zen/cmds/up_cmd.h
@@ -18,11 +18,11 @@ public:
virtual cxxopts::Options& Options() override { return m_Options; }
private:
- cxxopts::Options m_Options{"up", "Bring up zen service"};
- uint16_t m_Port = 0;
- bool m_ShowConsole = false;
- bool m_ShowLog = false;
- std::filesystem::path m_ProgramBaseDir;
+ cxxopts::Options m_Options{"up", "Bring up zen service"};
+ uint16_t m_Port = 0;
+ bool m_ShowConsole = false;
+ bool m_ShowLog = false;
+ std::string m_ProgramBaseDir;
};
class AttachCommand : public ZenCmdBase
@@ -35,10 +35,10 @@ public:
virtual cxxopts::Options& Options() override { return m_Options; }
private:
- cxxopts::Options m_Options{"attach", "Add a sponsor process to a running zen service"};
- uint16_t m_Port = 0;
- int m_OwnerPid = 0;
- std::filesystem::path m_DataDir;
+ cxxopts::Options m_Options{"attach", "Add a sponsor process to a running zen service"};
+ uint16_t m_Port = 0;
+ int m_OwnerPid = 0;
+ std::string m_DataDir;
};
class DownCommand : public ZenCmdBase
@@ -51,11 +51,11 @@ public:
virtual cxxopts::Options& Options() override { return m_Options; }
private:
- cxxopts::Options m_Options{"down", "Bring down zen service"};
- uint16_t m_Port = 0;
- bool m_ForceTerminate = false;
- std::filesystem::path m_ProgramBaseDir;
- std::filesystem::path m_DataDir;
+ cxxopts::Options m_Options{"down", "Bring down zen service"};
+ uint16_t m_Port = 0;
+ bool m_ForceTerminate = false;
+ std::string m_ProgramBaseDir;
+ std::string m_DataDir;
};
} // namespace zen
diff --git a/src/zen/cmds/workspaces_cmd.cpp b/src/zen/cmds/workspaces_cmd.cpp
index 166d4218d..5f3f8f7ca 100644
--- a/src/zen/cmds/workspaces_cmd.cpp
+++ b/src/zen/cmds/workspaces_cmd.cpp
@@ -25,16 +25,7 @@ namespace {
if (!Path.empty())
{
std::u8string PathString = Path.u8string();
- if (PathString.ends_with(std::filesystem::path::preferred_separator) || PathString.ends_with('/'))
- {
- PathString.pop_back();
- Path = std::filesystem::path(PathString);
- }
- // Special case if user gives a path with quotes and includes a backslash at the end:
- // ="path\" cxxopts strips the leading quote only but not the trailing.
- // As we expect paths here and we don't want trailing slashes we strip away the quote
- // manually if the string does not start with a quote UE-231677
- else if (PathString[0] != '\"' && PathString[PathString.length() - 1] == '\"')
+ if (PathString.ends_with(std::filesystem::path::preferred_separator) || PathString.starts_with('/'))
{
PathString.pop_back();
Path = std::filesystem::path(PathString);
@@ -96,7 +87,7 @@ WorkspaceCommand::WorkspaceCommand()
{
m_Options.add_options()("h,help", "Print help");
m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>");
- m_Options.add_options()("system-dir", "Specify system root", cxxopts::value<std::filesystem::path>(m_SystemRootDir));
+ m_Options.add_options()("system-dir", "Specify system root", cxxopts::value(m_SystemRootDir));
m_Options.add_option("", "v", "verb", "Verb for workspace - create, remove, info", cxxopts::value(m_Verb), "<verb>");
m_Options.parse_positional({"verb"});
m_Options.positional_help("verb");
@@ -148,16 +139,18 @@ WorkspaceCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_HostName = ResolveTargetHostSpec(m_HostName);
- if (m_SystemRootDir.empty())
+ std::filesystem::path SystemRootDir = StringToPath(m_SystemRootDir);
+
+ if (SystemRootDir.empty())
{
- m_SystemRootDir = PickDefaultSystemRootDirectory();
- if (m_SystemRootDir.empty())
+ SystemRootDir = PickDefaultSystemRootDirectory();
+ if (SystemRootDir.empty())
{
throw zen::OptionParseException("unable to resolve system root directory");
}
}
- std::filesystem::path StatePath = m_SystemRootDir / "workspaces";
+ std::filesystem::path StatePath = SystemRootDir / "workspaces";
if (!ParseOptions(*SubOption, gsl::narrow<int>(SubCommandArguments.size()), SubCommandArguments.data()))
{
@@ -171,12 +164,12 @@ WorkspaceCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw zen::OptionParseException(fmt::format("path is required\n{}", m_CreateOptions.help()));
}
- RemoveTrailingPathSeparator(m_Path);
+ std::filesystem::path Path = StringToPath(m_Path);
if (m_Id.empty())
{
- m_Id = Workspaces::PathToId(m_Path).ToString();
- ZEN_CONSOLE("Using generated workspace id {} from path '{}'", m_Id, m_Path);
+ m_Id = Workspaces::PathToId(Path).ToString();
+ ZEN_CONSOLE("Using generated workspace id {} from path '{}'", m_Id, Path);
}
if (Oid::TryFromHexString(m_Id) == Oid::Zero)
@@ -187,7 +180,7 @@ WorkspaceCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (Workspaces::AddWorkspace(
Log(),
StatePath,
- {.Id = Oid::FromHexString(m_Id), .RootPath = m_Path, .AllowShareCreationFromHttp = m_AllowShareCreationFromHttp}))
+ {.Id = Oid::FromHexString(m_Id), .RootPath = Path, .AllowShareCreationFromHttp = m_AllowShareCreationFromHttp}))
{
if (!m_HostName.empty())
{
@@ -287,7 +280,7 @@ WorkspaceShareCommand::WorkspaceShareCommand()
{
m_Options.add_options()("h,help", "Print help");
m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>");
- m_Options.add_options()("system-dir", "Specify system root", cxxopts::value<std::filesystem::path>(m_SystemRootDir));
+ m_Options.add_options()("system-dir", "Specify system root", cxxopts::value(m_SystemRootDir));
m_Options.add_option("", "v", "verb", "Verb for workspace - create, remove, info", cxxopts::value(m_Verb), "<verb>");
m_Options.parse_positional({"verb"});
m_Options.positional_help("verb");
@@ -399,16 +392,18 @@ WorkspaceShareCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char**
m_HostName = ResolveTargetHostSpec(m_HostName);
- if (m_SystemRootDir.empty())
+ std::filesystem::path SystemRootDir = StringToPath(m_SystemRootDir);
+
+ if (SystemRootDir.empty())
{
- m_SystemRootDir = PickDefaultSystemRootDirectory();
- if (m_SystemRootDir.empty())
+ SystemRootDir = PickDefaultSystemRootDirectory();
+ if (SystemRootDir.empty())
{
throw zen::OptionParseException("unable to resolve system root directory");
}
}
- std::filesystem::path StatePath = m_SystemRootDir / "workspaces";
+ std::filesystem::path StatePath = SystemRootDir / "workspaces";
if (!ParseOptions(*SubOption, gsl::narrow<int>(SubCommandArguments.size()), SubCommandArguments.data()))
{
@@ -417,7 +412,8 @@ WorkspaceShareCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char**
if (SubOption == &m_CreateOptions)
{
- if (m_WorkspaceRoot.empty())
+ std::filesystem::path WorkspaceRoot = StringToPath(m_WorkspaceRoot);
+ if (WorkspaceRoot.empty())
{
if (m_WorkspaceId.empty())
{
@@ -436,15 +432,15 @@ WorkspaceShareCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char**
ZEN_CONSOLE("Workspace {} does not exist", m_WorkspaceId);
return 0;
}
- m_WorkspaceRoot = WorkspaceConfig.RootPath;
+ WorkspaceRoot = WorkspaceConfig.RootPath;
}
else
{
- RemoveTrailingPathSeparator(m_WorkspaceRoot);
+ RemoveTrailingPathSeparator(WorkspaceRoot);
if (m_WorkspaceId.empty())
{
- m_WorkspaceId = Workspaces::PathToId(m_WorkspaceRoot).ToString();
- ZEN_CONSOLE("Using generated workspace id {} from path '{}'", m_WorkspaceId, m_WorkspaceRoot);
+ m_WorkspaceId = Workspaces::PathToId(WorkspaceRoot).ToString();
+ ZEN_CONSOLE("Using generated workspace id {} from path '{}'", m_WorkspaceId, WorkspaceRoot);
}
else
{
@@ -453,23 +449,25 @@ WorkspaceShareCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char**
throw zen::OptionParseException(fmt::format("workspace id '{}' is invalid", m_WorkspaceId));
}
}
- if (Workspaces::AddWorkspace(Log(), StatePath, {.Id = Oid::FromHexString(m_WorkspaceId), .RootPath = m_WorkspaceRoot}))
+ if (Workspaces::AddWorkspace(Log(), StatePath, {.Id = Oid::FromHexString(m_WorkspaceId), .RootPath = WorkspaceRoot}))
{
- ZEN_CONSOLE("Created workspace {} using root path '{}'", m_WorkspaceId, m_WorkspaceRoot);
+ ZEN_CONSOLE("Created workspace {} using root path '{}'", m_WorkspaceId, WorkspaceRoot);
}
else
{
- ZEN_CONSOLE("Using existing workspace {} with root path '{}'", m_WorkspaceId, m_WorkspaceRoot);
+ ZEN_CONSOLE("Using existing workspace {} with root path '{}'", m_WorkspaceId, WorkspaceRoot);
}
}
- RemoveTrailingPathSeparator(m_SharePath);
- RemoveLeadingPathSeparator(m_SharePath);
+ std::filesystem::path SharePath = StringToPath(m_SharePath);
+
+ RemoveLeadingPathSeparator(SharePath);
+ RemoveTrailingPathSeparator(SharePath);
if (m_ShareId.empty())
{
- m_ShareId = Workspaces::PathToId(m_SharePath).ToString();
- ZEN_CONSOLE("Using generated share id {}, for path '{}'", m_ShareId, m_SharePath);
+ m_ShareId = Workspaces::PathToId(SharePath).ToString();
+ ZEN_CONSOLE("Using generated share id {}, for path '{}'", m_ShareId, SharePath);
}
if (Oid::TryFromHexString(m_ShareId) == Oid::Zero)
@@ -478,8 +476,8 @@ WorkspaceShareCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char**
}
if (Workspaces::AddWorkspaceShare(Log(),
- m_WorkspaceRoot,
- {.Id = Oid::FromHexString(m_ShareId), .SharePath = m_SharePath, .Alias = m_Alias}))
+ WorkspaceRoot,
+ {.Id = Oid::FromHexString(m_ShareId), .SharePath = SharePath, .Alias = m_Alias}))
{
if (!m_HostName.empty())
{
@@ -531,7 +529,8 @@ WorkspaceShareCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char**
ZEN_CONSOLE("Workspace {} does not exist", m_WorkspaceId);
return 0;
}
- m_WorkspaceRoot = WorkspaceConfig.RootPath;
+
+ std::filesystem::path WorkspaceRoot = WorkspaceConfig.RootPath;
if (m_ShareId.empty())
{
@@ -543,8 +542,7 @@ WorkspaceShareCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char**
throw zen::OptionParseException(fmt::format("workspace id '{}' is invalid", m_ShareId));
}
- Workspaces::WorkspaceShareConfiguration Share =
- Workspaces::FindWorkspaceShare(Log(), m_WorkspaceRoot, Oid::FromHexString(m_ShareId));
+ Workspaces::WorkspaceShareConfiguration Share = Workspaces::FindWorkspaceShare(Log(), WorkspaceRoot, Oid::FromHexString(m_ShareId));
if (Share.Id == Oid::Zero)
{
ZEN_CONSOLE("Workspace share {} does not exist", m_ShareId);
@@ -556,6 +554,7 @@ WorkspaceShareCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char**
if (SubOption == &m_RemoveOptions)
{
+ std::filesystem::path WorkspaceRoot;
if (!m_Alias.empty())
{
Workspaces::WorkspaceConfiguration WorkspaceConfig;
@@ -566,9 +565,9 @@ WorkspaceShareCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char**
ZEN_CONSOLE("Workspace share with alias {} does not exist", m_Alias);
return 0;
}
- m_ShareId = ShareConfig.Id.ToString();
- m_WorkspaceId = WorkspaceConfig.Id.ToString();
- m_WorkspaceRoot = WorkspaceConfig.RootPath;
+ m_ShareId = ShareConfig.Id.ToString();
+ m_WorkspaceId = WorkspaceConfig.Id.ToString();
+ WorkspaceRoot = WorkspaceConfig.RootPath;
}
if (m_WorkspaceId.empty())
@@ -587,7 +586,7 @@ WorkspaceShareCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char**
ZEN_CONSOLE("Workspace {} does not exist", m_WorkspaceId);
return 0;
}
- m_WorkspaceRoot = WorkspaceConfig.RootPath;
+ WorkspaceRoot = WorkspaceConfig.RootPath;
if (m_ShareId.empty())
{
@@ -599,7 +598,7 @@ WorkspaceShareCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char**
throw zen::OptionParseException(fmt::format("workspace id '{}' is invalid", m_ShareId));
}
- if (Workspaces::RemoveWorkspaceShare(Log(), m_WorkspaceRoot, Oid::FromHexString(m_ShareId)))
+ if (Workspaces::RemoveWorkspaceShare(Log(), WorkspaceRoot, Oid::FromHexString(m_ShareId)))
{
if (!m_HostName.empty())
{
diff --git a/src/zen/cmds/workspaces_cmd.h b/src/zen/cmds/workspaces_cmd.h
index de0edd061..86452e25e 100644
--- a/src/zen/cmds/workspaces_cmd.h
+++ b/src/zen/cmds/workspaces_cmd.h
@@ -21,17 +21,17 @@ public:
virtual cxxopts::Options& Options() override { return m_Options; }
private:
- cxxopts::Options m_Options{Name, Description};
- std::string m_HostName;
- std::filesystem::path m_SystemRootDir;
+ cxxopts::Options m_Options{Name, Description};
+ std::string m_HostName;
+ std::string m_SystemRootDir;
std::string m_Verb; // create, info, remove
std::string m_Id;
- cxxopts::Options m_CreateOptions{"create", "Create a workspace"};
- std::filesystem::path m_Path;
- bool m_AllowShareCreationFromHttp = false;
+ cxxopts::Options m_CreateOptions{"create", "Create a workspace"};
+ std::string m_Path;
+ bool m_AllowShareCreationFromHttp = false;
cxxopts::Options m_InfoOptions{"info", "Info about a workspace"};
@@ -53,17 +53,17 @@ public:
virtual cxxopts::Options& Options() override { return m_Options; }
private:
- cxxopts::Options m_Options{Name, Description};
- std::string m_HostName;
- std::filesystem::path m_SystemRootDir;
- std::string m_WorkspaceId;
- std::filesystem::path m_WorkspaceRoot;
- std::string m_Verb; // create, info, remove
- std::string m_ShareId;
- std::string m_Alias;
-
- cxxopts::Options m_CreateOptions{"create", "Create a workspace share"};
- std::filesystem::path m_SharePath;
+ cxxopts::Options m_Options{Name, Description};
+ std::string m_HostName;
+ std::string m_SystemRootDir;
+ std::string m_WorkspaceId;
+ std::string m_WorkspaceRoot;
+ std::string m_Verb; // create, info, remove
+ std::string m_ShareId;
+ std::string m_Alias;
+
+ cxxopts::Options m_CreateOptions{"create", "Create a workspace share"};
+ std::string m_SharePath;
bool m_Refresh = false;
diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp
index 9d0eab7dc..6f831349b 100644
--- a/src/zen/zen.cpp
+++ b/src/zen/zen.cpp
@@ -29,6 +29,7 @@
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/process.h>
#include <zencore/scopeguard.h>
#include <zencore/string.h>
#include <zencore/trace.h>
@@ -414,6 +415,23 @@ ProgressBar::HasActiveTask() const
int
main(int argc, char** argv)
{
+ std::vector<std::string> Args;
+#if ZEN_PLATFORM_WINDOWS
+ LPWSTR RawCommandLine = GetCommandLine();
+ std::string CommandLine = zen::WideToUtf8(RawCommandLine);
+ Args = zen::ParseCommandLine(CommandLine);
+#else
+ Args.reserve(argc);
+ for (int I = 0; I < argc; I++)
+ {
+ Args.push_back(std::string(argv[I]));
+ }
+#endif
+ std::vector<char*> RawArgs = zen::StripCommandlineQuotes(Args);
+
+ argc = gsl::narrow<int>(RawArgs.size());
+ argv = RawArgs.data();
+
using namespace zen;
using namespace std::literals;
diff --git a/src/zencore/basicfile.cpp b/src/zencore/basicfile.cpp
index 95876cff4..a181bbd66 100644
--- a/src/zencore/basicfile.cpp
+++ b/src/zencore/basicfile.cpp
@@ -796,6 +796,12 @@ BasicFileWriter::Write(const void* Data, uint64_t Size, uint64_t FileOffset)
{
if (m_Buffer == nullptr || (Size >= m_BufferSize))
{
+ if (FileOffset == m_BufferEnd)
+ {
+ Flush();
+ m_BufferStart = m_BufferEnd = FileOffset + Size;
+ }
+
m_Base.Write(Data, Size, FileOffset);
return;
}
diff --git a/src/zencore/compress.cpp b/src/zencore/compress.cpp
index 88c3bb5b9..ad6b6103c 100644
--- a/src/zencore/compress.cpp
+++ b/src/zencore/compress.cpp
@@ -158,9 +158,10 @@ class BaseEncoder
{
public:
[[nodiscard]] virtual CompositeBuffer Compress(const CompositeBuffer& RawData, uint64_t BlockSize = DefaultBlockSize) const = 0;
- [[nodiscard]] virtual bool CompressToStream(const CompositeBuffer& RawData,
- std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
- uint64_t BlockSize = DefaultBlockSize) const = 0;
+ [[nodiscard]] virtual bool CompressToStream(
+ const CompositeBuffer& RawData,
+ std::function<void(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
+ uint64_t BlockSize = DefaultBlockSize) const = 0;
};
class BaseDecoder
@@ -189,11 +190,13 @@ public:
uint64_t RawOffset,
uint64_t RawSize) const = 0;
- virtual bool DecompressToStream(const BufferHeader& Header,
- const CompositeBuffer& CompressedData,
- uint64_t RawOffset,
- uint64_t RawSize,
- std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const = 0;
+ virtual bool DecompressToStream(
+ const BufferHeader& Header,
+ const CompositeBuffer& CompressedData,
+ uint64_t RawOffset,
+ uint64_t RawSize,
+ std::function<bool(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback)
+ const = 0;
};
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -207,13 +210,14 @@ public:
return CompositeBuffer(HeaderData.MoveToShared(), RawData.MakeOwned());
}
- [[nodiscard]] virtual bool CompressToStream(const CompositeBuffer& RawData,
- std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
- uint64_t /* BlockSize */) const final
+ [[nodiscard]] virtual bool CompressToStream(
+ const CompositeBuffer& RawData,
+ std::function<void(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
+ uint64_t /* BlockSize */) const final
{
UniqueBuffer HeaderData = CompressedBuffer::CreateHeaderForNoneEncoder(RawData.GetSize(), BLAKE3::HashBuffer(RawData));
- Callback(0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderData.GetData(), HeaderData.GetSize())));
- Callback(HeaderData.GetSize(), RawData);
+ Callback(0, 0, 0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderData.GetData(), HeaderData.GetSize())));
+ Callback(0, RawData.GetSize(), HeaderData.GetSize(), RawData);
return true;
}
};
@@ -283,17 +287,19 @@ public:
[[nodiscard]] uint64_t GetHeaderSize(const BufferHeader&) const final { return sizeof(BufferHeader); }
- virtual bool DecompressToStream(const BufferHeader& Header,
- const CompositeBuffer& CompressedData,
- uint64_t RawOffset,
- uint64_t RawSize,
- std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const final
+ virtual bool DecompressToStream(
+ const BufferHeader& Header,
+ const CompositeBuffer& CompressedData,
+ uint64_t RawOffset,
+ uint64_t RawSize,
+ std::function<bool(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback)
+ const final
{
if (Header.Method == CompressionMethod::None && Header.TotalCompressedSize == CompressedData.GetSize() &&
Header.TotalCompressedSize == Header.TotalRawSize + sizeof(BufferHeader) && RawOffset < Header.TotalRawSize &&
(RawOffset + RawSize) <= Header.TotalRawSize)
{
- if (!Callback(0, CompressedData.Mid(sizeof(BufferHeader) + RawOffset, RawSize)))
+ if (!Callback(sizeof(BufferHeader) + RawOffset, RawSize, 0, CompressedData.Mid(sizeof(BufferHeader) + RawOffset, RawSize)))
{
return false;
}
@@ -309,9 +315,10 @@ class BlockEncoder : public BaseEncoder
{
public:
virtual CompositeBuffer Compress(const CompositeBuffer& RawData, uint64_t BlockSize) const final;
- virtual bool CompressToStream(const CompositeBuffer& RawData,
- std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
- uint64_t BlockSize) const final;
+ virtual bool CompressToStream(
+ const CompositeBuffer& RawData,
+ std::function<void(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
+ uint64_t BlockSize) const final;
protected:
virtual CompressionMethod GetMethod() const = 0;
@@ -460,9 +467,10 @@ BlockEncoder::Compress(const CompositeBuffer& RawData, const uint64_t BlockSize)
}
bool
-BlockEncoder::CompressToStream(const CompositeBuffer& RawData,
- std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
- uint64_t BlockSize = DefaultBlockSize) const
+BlockEncoder::CompressToStream(
+ const CompositeBuffer& RawData,
+ std::function<void(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
+ uint64_t BlockSize = DefaultBlockSize) const
{
ZEN_ASSERT(IsPow2(BlockSize) && (BlockSize <= (1u << 31)));
@@ -504,13 +512,17 @@ BlockEncoder::CompressToStream(const CompositeBuffer& RawData,
uint64_t CompressedBlockSize = CompressedBlock.GetSize();
if (RawBlockSize <= CompressedBlockSize)
{
- Callback(FullHeaderSize + CompressedSize,
+ Callback(FileRef.FileChunkOffset + RawOffset,
+ RawBlockSize,
+ FullHeaderSize + CompressedSize,
CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawBlockCopy.GetView().GetData(), RawBlockSize)));
CompressedBlockSize = RawBlockSize;
}
else
{
- Callback(FullHeaderSize + CompressedSize,
+ Callback(FileRef.FileChunkOffset + RawOffset,
+ RawBlockSize,
+ FullHeaderSize + CompressedSize,
CompositeBuffer(IoBuffer(IoBuffer::Wrap, CompressedBlock.GetData(), CompressedBlockSize)));
}
@@ -540,12 +552,17 @@ BlockEncoder::CompressToStream(const CompositeBuffer& RawData,
uint64_t CompressedBlockSize = CompressedBlock.GetSize();
if (RawBlockSize <= CompressedBlockSize)
{
- Callback(FullHeaderSize + CompressedSize, CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawBlock.GetData(), RawBlockSize)));
+ Callback(RawOffset,
+ RawBlockSize,
+ FullHeaderSize + CompressedSize,
+ CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawBlock.GetData(), RawBlockSize)));
CompressedBlockSize = RawBlockSize;
}
else
{
- Callback(FullHeaderSize + CompressedSize,
+ Callback(RawOffset,
+ RawBlockSize,
+ FullHeaderSize + CompressedSize,
CompositeBuffer(IoBuffer(IoBuffer::Wrap, CompressedBlock.GetData(), CompressedBlockSize)));
}
@@ -582,7 +599,7 @@ BlockEncoder::CompressToStream(const CompositeBuffer& RawData,
HeaderBuffer.GetMutableView().Mid(sizeof(BufferHeader), MetaSize).CopyFrom(MakeMemoryView(CompressedBlockSizes));
Header.Write(HeaderBuffer.GetMutableView());
- Callback(0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderBuffer.GetData(), HeaderBuffer.GetSize())));
+ Callback(0, 0, 0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderBuffer.GetData(), HeaderBuffer.GetSize())));
return true;
}
@@ -615,11 +632,13 @@ public:
MutableMemoryView RawView,
uint64_t RawOffset) const final;
- virtual bool DecompressToStream(const BufferHeader& Header,
- const CompositeBuffer& CompressedData,
- uint64_t RawOffset,
- uint64_t RawSize,
- std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const final;
+ virtual bool DecompressToStream(
+ const BufferHeader& Header,
+ const CompositeBuffer& CompressedData,
+ uint64_t RawOffset,
+ uint64_t RawSize,
+ std::function<bool(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback)
+ const final;
protected:
virtual bool DecompressBlock(MutableMemoryView RawData, MemoryView CompressedData) const = 0;
@@ -743,11 +762,12 @@ BlockDecoder::DecompressToComposite(const BufferHeader& Header, const CompositeB
}
bool
-BlockDecoder::DecompressToStream(const BufferHeader& Header,
- const CompositeBuffer& CompressedData,
- uint64_t RawOffset,
- uint64_t RawSize,
- std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const
+BlockDecoder::DecompressToStream(
+ const BufferHeader& Header,
+ const CompositeBuffer& CompressedData,
+ uint64_t RawOffset,
+ uint64_t RawSize,
+ std::function<bool(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const
{
if (Header.TotalCompressedSize != CompressedData.GetSize())
{
@@ -817,7 +837,9 @@ BlockDecoder::DecompressToStream(const BufferHeader& Header,
Source.Detach();
return false;
}
- if (!Callback(BlockIndex * BlockSize + OffsetInFirstBlock,
+ if (!Callback(FileRef.FileChunkOffset + CompressedOffset,
+ CompressedBlockSize,
+ BlockIndex * BlockSize + OffsetInFirstBlock,
CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawDataBuffer.GetData(), BytesToUncompress))))
{
Source.Detach();
@@ -827,6 +849,8 @@ BlockDecoder::DecompressToStream(const BufferHeader& Header,
else
{
if (!Callback(
+ FileRef.FileChunkOffset + CompressedOffset,
+ BytesToUncompress,
BlockIndex * BlockSize + OffsetInFirstBlock,
CompositeBuffer(
IoBuffer(IoBuffer::Wrap, CompressedBlockCopy.GetView().Mid(OffsetInFirstBlock).GetData(), BytesToUncompress))))
@@ -870,7 +894,9 @@ BlockDecoder::DecompressToStream(const BufferHeader& Header,
{
return false;
}
- if (!Callback(BlockIndex * BlockSize + OffsetInFirstBlock,
+ if (!Callback(CompressedOffset,
+ UncompressedBlockSize,
+ BlockIndex * BlockSize + OffsetInFirstBlock,
CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawDataBuffer.GetData(), BytesToUncompress))))
{
return false;
@@ -879,6 +905,8 @@ BlockDecoder::DecompressToStream(const BufferHeader& Header,
else
{
if (!Callback(
+ CompressedOffset,
+ BytesToUncompress,
BlockIndex * BlockSize + OffsetInFirstBlock,
CompositeBuffer(
IoBuffer(IoBuffer::Wrap, CompressedBlockCopy.GetView().Mid(OffsetInFirstBlock).GetData(), BytesToUncompress))))
@@ -1778,11 +1806,12 @@ CompressedBuffer::Compress(const SharedBuffer& RawData,
}
bool
-CompressedBuffer::CompressToStream(const CompositeBuffer& RawData,
- std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
- OodleCompressor Compressor,
- OodleCompressionLevel CompressionLevel,
- uint64_t BlockSize)
+CompressedBuffer::CompressToStream(
+ const CompositeBuffer& RawData,
+ std::function<void(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
+ OodleCompressor Compressor,
+ OodleCompressionLevel CompressionLevel,
+ uint64_t BlockSize)
{
using namespace detail;
@@ -1995,9 +2024,10 @@ CompressedBuffer::DecompressToComposite() const
}
bool
-CompressedBuffer::DecompressToStream(uint64_t RawOffset,
- uint64_t RawSize,
- std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const
+CompressedBuffer::DecompressToStream(
+ uint64_t RawOffset,
+ uint64_t RawSize,
+ std::function<bool(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const
{
using namespace detail;
if (CompressedData)
diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp
index 9f3f4f7fc..05e2bf049 100644
--- a/src/zencore/filesystem.cpp
+++ b/src/zencore/filesystem.cpp
@@ -2043,6 +2043,19 @@ SetFileReadOnly(const std::filesystem::path& Filename, bool ReadOnly)
return false;
}
+std::filesystem::path
+StringToPath(const std::string_view& Path)
+{
+ if (Path.length() > 2 && Path.front() == '\"' && Path.back() == '\"')
+ {
+ return std::filesystem::path(Path.substr(1, Path.length() - 2)).make_preferred();
+ }
+ else
+ {
+ return std::filesystem::path(Path).make_preferred();
+ }
+}
+
//////////////////////////////////////////////////////////////////////////
//
// Testing related code follows...
diff --git a/src/zencore/include/zencore/compress.h b/src/zencore/include/zencore/compress.h
index 74fd5f767..09fa6249d 100644
--- a/src/zencore/include/zencore/compress.h
+++ b/src/zencore/include/zencore/compress.h
@@ -74,11 +74,12 @@ public:
OodleCompressor Compressor = OodleCompressor::Mermaid,
OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast,
uint64_t BlockSize = 0);
- [[nodiscard]] ZENCORE_API static bool CompressToStream(const CompositeBuffer& RawData,
- std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
- OodleCompressor Compressor = OodleCompressor::Mermaid,
- OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast,
- uint64_t BlockSize = 0);
+ [[nodiscard]] ZENCORE_API static bool CompressToStream(
+ const CompositeBuffer& RawData,
+ std::function<void(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
+ OodleCompressor Compressor = OodleCompressor::Mermaid,
+ OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast,
+ uint64_t BlockSize = 0);
/**
* Construct from a compressed buffer previously created by Compress().
@@ -207,9 +208,10 @@ public:
*
* @return True if the buffer is valid and can be decompressed.
*/
- [[nodiscard]] ZENCORE_API bool DecompressToStream(uint64_t RawOffset,
- uint64_t RawSize,
- std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const;
+ [[nodiscard]] ZENCORE_API bool DecompressToStream(
+ uint64_t RawOffset,
+ uint64_t RawSize,
+ std::function<bool(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const;
/** A null compressed buffer. */
static const CompressedBuffer Null;
diff --git a/src/zencore/include/zencore/filesystem.h b/src/zencore/include/zencore/filesystem.h
index e020668fc..9a2b15d1d 100644
--- a/src/zencore/include/zencore/filesystem.h
+++ b/src/zencore/include/zencore/filesystem.h
@@ -292,6 +292,8 @@ uint32_t MakeFileModeReadOnly(uint32_t FileMode, bool ReadOnly);
bool SetFileReadOnly(const std::filesystem::path& Filename, bool ReadOnly);
+std::filesystem::path StringToPath(const std::string_view& Path);
+
//////////////////////////////////////////////////////////////////////////
void filesystem_forcelink(); // internal
diff --git a/src/zencore/include/zencore/process.h b/src/zencore/include/zencore/process.h
index d1394cd9a..0c5931ba0 100644
--- a/src/zencore/include/zencore/process.h
+++ b/src/zencore/include/zencore/process.h
@@ -100,6 +100,9 @@ int GetProcessId(CreateProcResult ProcId);
std::filesystem::path GetProcessExecutablePath(int Pid, std::error_code& OutEc);
std::error_code FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHandle);
+std::vector<std::string> ParseCommandLine(std::string_view CommandLine);
+std::vector<char*> StripCommandlineQuotes(std::vector<std::string>& InOutArgs);
+
void process_forcelink(); // internal
} // namespace zen
diff --git a/src/zencore/process.cpp b/src/zencore/process.cpp
index c51e8f69d..0761521dc 100644
--- a/src/zencore/process.cpp
+++ b/src/zencore/process.cpp
@@ -1047,6 +1047,118 @@ FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHand
#endif // ZEN_PLATFORM_LINUX
}
+std::vector<std::string>
+ParseCommandLine(std::string_view CommandLine)
+{
+ auto IsWhitespaceOrEnd = [](std::string_view CommandLine, std::string::size_type Pos) {
+ if (Pos == CommandLine.length())
+ {
+ return true;
+ }
+ if (CommandLine[Pos] == ' ')
+ {
+ return true;
+ }
+ return false;
+ };
+
+ bool IsParsingArg = false;
+ bool IsInQuote = false;
+
+ std::string::size_type Pos = 0;
+ std::string::size_type ArgStart = 0;
+ std::vector<std::string> Args;
+ while (Pos < CommandLine.length())
+ {
+ if (IsInQuote)
+ {
+ if (CommandLine[Pos] == '"' && IsWhitespaceOrEnd(CommandLine, Pos + 1))
+ {
+ Args.push_back(std::string(CommandLine.substr(ArgStart, Pos - ArgStart + 1)));
+ Pos++;
+ IsInQuote = false;
+ IsParsingArg = false;
+ }
+ else
+ {
+ Pos++;
+ }
+ }
+ else if (IsParsingArg)
+ {
+ ZEN_ASSERT(Pos > ArgStart);
+ if (CommandLine[Pos] == ' ')
+ {
+ Args.push_back(std::string(CommandLine.substr(ArgStart, Pos - ArgStart)));
+ Pos++;
+ IsParsingArg = false;
+ }
+ else if (CommandLine[Pos] == '"')
+ {
+ IsInQuote = true;
+ Pos++;
+ }
+ else
+ {
+ Pos++;
+ }
+ }
+ else if (CommandLine[Pos] == '"')
+ {
+ IsInQuote = true;
+ IsParsingArg = true;
+ ArgStart = Pos;
+ Pos++;
+ }
+ else if (CommandLine[Pos] != ' ')
+ {
+ IsParsingArg = true;
+ ArgStart = Pos;
+ Pos++;
+ }
+ else
+ {
+ Pos++;
+ }
+ }
+ if (IsParsingArg)
+ {
+ ZEN_ASSERT(Pos > ArgStart);
+ Args.push_back(std::string(CommandLine.substr(ArgStart)));
+ }
+
+ return Args;
+}
+
+std::vector<char*>
+StripCommandlineQuotes(std::vector<std::string>& InOutArgs)
+{
+ std::vector<char*> RawArgs;
+ RawArgs.reserve(InOutArgs.size());
+ for (std::string& Arg : InOutArgs)
+ {
+ std::string::size_type EscapedQuotePos = Arg.find("\\\"", 1);
+ while (EscapedQuotePos != std::string::npos && Arg.rfind('\"', EscapedQuotePos - 1) != std::string::npos)
+ {
+ Arg.erase(EscapedQuotePos, 1);
+ EscapedQuotePos = Arg.find("\\\"", EscapedQuotePos);
+ }
+
+ if (Arg.starts_with("\""))
+ {
+ if (Arg.find('"', 1) == Arg.length() - 1)
+ {
+ if (Arg.find(' ', 1) == std::string::npos)
+ {
+ Arg = Arg.substr(1, Arg.length() - 2);
+ }
+ }
+ }
+ RawArgs.push_back(const_cast<char*>(Arg.c_str()));
+ }
+ return RawArgs;
+}
+
#if ZEN_WITH_TESTS
void
@@ -1123,6 +1235,36 @@ TEST_CASE("BuildArgV")
}
}
+TEST_CASE("CommandLine")
+{
+ std::vector<std::string> v1 = ParseCommandLine("c:\\my\\exe.exe \"quoted arg\" \"one\",two,\"three\\\"");
+ CHECK_EQ(v1[0], "c:\\my\\exe.exe");
+ CHECK_EQ(v1[1], "\"quoted arg\"");
+ CHECK_EQ(v1[2], "\"one\",two,\"three\\\"");
+
+ std::vector<std::string> v2 = ParseCommandLine(
+ "--tracehost 127.0.0.1 builds download --url=https://jupiter.devtools.epicgames.com --namespace=ue.oplog "
+ "--bucket=citysample.packaged-build.fortnite-main.windows \"c:\\just\\a\\path\" "
+ "--access-token-path=\"C:\\Users\\dan.engelbrecht\\jupiter-token.json\" \"D:\\Dev\\Spaced Folder\\Target\\\" "
+ "--alt-path=\"D:\\Dev\\Spaced Folder2\\Target\\\" 07dn23ifiwesnvoasjncasab --build-part-name win64,linux,ps5");
+
+ std::vector<char*> v2Stripped = StripCommandlineQuotes(v2);
+ CHECK_EQ(v2Stripped[0], std::string("--tracehost"));
+ CHECK_EQ(v2Stripped[1], std::string("127.0.0.1"));
+ CHECK_EQ(v2Stripped[2], std::string("builds"));
+ CHECK_EQ(v2Stripped[3], std::string("download"));
+ CHECK_EQ(v2Stripped[4], std::string("--url=https://jupiter.devtools.epicgames.com"));
+ CHECK_EQ(v2Stripped[5], std::string("--namespace=ue.oplog"));
+ CHECK_EQ(v2Stripped[6], std::string("--bucket=citysample.packaged-build.fortnite-main.windows"));
+ CHECK_EQ(v2Stripped[7], std::string("c:\\just\\a\\path"));
+ CHECK_EQ(v2Stripped[8], std::string("--access-token-path=\"C:\\Users\\dan.engelbrecht\\jupiter-token.json\""));
+ CHECK_EQ(v2Stripped[9], std::string("\"D:\\Dev\\Spaced Folder\\Target\""));
+ CHECK_EQ(v2Stripped[10], std::string("--alt-path=\"D:\\Dev\\Spaced Folder2\\Target\""));
+ CHECK_EQ(v2Stripped[11], std::string("07dn23ifiwesnvoasjncasab"));
+ CHECK_EQ(v2Stripped[12], std::string("--build-part-name"));
+ CHECK_EQ(v2Stripped[13], std::string("win64,linux,ps5"));
+}
+
TEST_SUITE_END(/* core.process */);
#endif
diff --git a/src/zenutil/include/zenutil/parallellwork.h b/src/zenutil/include/zenutil/parallellwork.h
index 79798fc8d..8ea77c65d 100644
--- a/src/zenutil/include/zenutil/parallellwork.h
+++ b/src/zenutil/include/zenutil/parallellwork.h
@@ -9,6 +9,14 @@
#include <atomic>
+class ParallellWorkException : public std::runtime_error
+{
+public:
+ explicit ParallellWorkException(std::vector<std::string>&& Errors) : std::runtime_error(Errors.front()), m_Errors(std::move(Errors)) {}
+
+ const std::vector<std::string> m_Errors;
+};
+
namespace zen {
class ParallellWork
@@ -95,13 +103,7 @@ public:
}
else if (m_Errors.size() > 1)
{
- ExtendableStringBuilder<128> SB;
- SB.Append("Multiple errors:");
- for (const std::string& Error : m_Errors)
- {
- SB.Append(fmt::format("\n {}", Error));
- }
- throw std::runtime_error(SB.ToString());
+ throw ParallellWorkException(std::move(m_Errors));
}
}
Latch& PendingWork() { return m_PendingWork; }