aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-10-14 18:24:17 +0200
committerGitHub Enterprise <[email protected]>2025-10-14 18:24:17 +0200
commit74c90983853f5ef0966be35544173d6efd85db17 (patch)
treee4fe6faa53697f2b1503e3919527d5ad18260890 /src
parentrefactor builds cmd part2 (#572) (diff)
downloadzen-74c90983853f5ef0966be35544173d6efd85db17.tar.xz
zen-74c90983853f5ef0966be35544173d6efd85db17.zip
refactor builds cmd part3 (#573)
* move lambdas to member functions * add BuildsOperationValidateBuildPart
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.cpp511
-rw-r--r--src/zenremotestore/builds/buildstorageoperations.cpp724
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h75
3 files changed, 649 insertions, 661 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index b60844ea1..6a0a1a0da 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -66,46 +66,6 @@ namespace zen {
using namespace std::literals;
namespace {
- namespace zenutil {
-
-#if ZEN_PLATFORM_WINDOWS
- class SecurityAttributes
- {
- public:
- inline SECURITY_ATTRIBUTES* Attributes() { return &m_Attributes; }
-
- protected:
- SECURITY_ATTRIBUTES m_Attributes{};
- SECURITY_DESCRIPTOR m_Sd{};
- };
-
- // Security attributes which allows any user access
-
- class AnyUserSecurityAttributes : public SecurityAttributes
- {
- public:
- AnyUserSecurityAttributes()
- {
- m_Attributes.nLength = sizeof m_Attributes;
- m_Attributes.bInheritHandle = false; // Disable inheritance
-
- const BOOL Success = InitializeSecurityDescriptor(&m_Sd, SECURITY_DESCRIPTOR_REVISION);
-
- if (Success)
- {
- if (!SetSecurityDescriptorDacl(&m_Sd, TRUE, (PACL)NULL, FALSE))
- {
- ThrowLastError("SetSecurityDescriptorDacl failed");
- }
-
- m_Attributes.lpSecurityDescriptor = &m_Sd;
- }
- }
- };
-#endif // ZEN_PLATFORM_WINDOWS
-
- } // namespace zenutil
-
static std::atomic<bool> AbortFlag = false;
static std::atomic<bool> PauseFlag = false;
@@ -283,14 +243,6 @@ namespace {
// std::filesystem::path ZenStateFileJsonPath(const std::filesystem::path& ZenFolderPath) { return ZenFolderPath / "current_state.json";
// }
- std::filesystem::path ZenTempCacheFolderPath(const std::filesystem::path& ZenFolderPath)
- {
- return ZenTempFolderPath(ZenFolderPath) / "cache"; // Decompressed and verified data - chunks & sequences
- }
- std::filesystem::path ZenTempBlockFolderPath(const std::filesystem::path& ZenFolderPath)
- {
- return ZenTempFolderPath(ZenFolderPath) / "blocks"; // Temp storage for whole and partial blocks
- }
std::filesystem::path UploadTempDirectory(const std::filesystem::path& Path)
{
const std::u8string LocalPathString = Path.generic_u8string();
@@ -298,11 +250,6 @@ namespace {
return std::filesystem::temp_directory_path() / fmt::format("zen_{}", PathHash);
}
- std::filesystem::path ZenTempDownloadFolderPath(const std::filesystem::path& ZenFolderPath)
- {
- return ZenTempFolderPath(ZenFolderPath) / "download"; // Temp storage for decompressed and validated chunks
- }
-
const std::string ZenExcludeManifestName = ".zen_exclude_manifest.txt";
const std::string UnsyncFolderName = ".unsync";
@@ -1159,406 +1106,35 @@ namespace {
TemporaryFile::SafeWriteFile(WritePath, JsonPayload);
}
- CompositeBuffer ValidateBlob(IoBuffer&& Payload, const IoHash& BlobHash, uint64_t& OutCompressedSize, uint64_t& OutDecompressedSize)
- {
- ZEN_TRACE_CPU("ValidateBlob");
-
- if (Payload.GetContentType() != ZenContentType::kCompressedBinary)
- {
- throw std::runtime_error(fmt::format("Blob {} ({} bytes) has unexpected content type '{}'",
- BlobHash,
- Payload.GetSize(),
- ToString(Payload.GetContentType())));
- }
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize);
- if (!Compressed)
- {
- throw std::runtime_error(fmt::format("Blob {} ({} bytes) compressed header is invalid", BlobHash, Payload.GetSize()));
- }
- if (RawHash != BlobHash)
- {
- throw std::runtime_error(
- fmt::format("Blob {} ({} bytes) compressed header has a mismatching raw hash {}", BlobHash, Payload.GetSize(), RawHash));
- }
-
- IoHashStream Hash;
- bool CouldDecompress = Compressed.DecompressToStream(
- 0,
- RawSize,
- [&Hash](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
- ZEN_UNUSED(SourceOffset, SourceSize, Offset);
- if (!AbortFlag)
- {
- for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
- {
- Hash.Append(Segment.GetView());
- }
- return true;
- }
- return false;
- });
-
- if (AbortFlag)
- {
- return CompositeBuffer{};
- }
-
- if (!CouldDecompress)
- {
- throw std::runtime_error(
- fmt::format("Blob {} ({} bytes) failed to decompress - header information mismatch", BlobHash, Payload.GetSize()));
- }
- IoHash ValidateRawHash = Hash.GetHash();
- if (ValidateRawHash != BlobHash)
- {
- throw std::runtime_error(fmt::format("Blob {} ({} bytes) decompressed hash {} does not match header information",
- BlobHash,
- Payload.GetSize(),
- ValidateRawHash));
- }
- OodleCompressor Compressor;
- OodleCompressionLevel CompressionLevel;
- uint64_t BlockSize;
- if (!Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
- {
- throw std::runtime_error(fmt::format("Blob {} ({} bytes) failed to get compression details", BlobHash, Payload.GetSize()));
- }
- OutCompressedSize = Payload.GetSize();
- OutDecompressedSize = RawSize;
- if (CompressionLevel == OodleCompressionLevel::None)
- {
- // Only decompress to composite if we need it for block verification
- CompositeBuffer DecompressedComposite = Compressed.DecompressToComposite();
- if (!DecompressedComposite)
- {
- throw std::runtime_error(fmt::format("Blob {} ({} bytes) failed to decompress to composite", BlobHash, Payload.GetSize()));
- }
- return DecompressedComposite;
- }
- return CompositeBuffer{};
- }
-
- CompositeBuffer ValidateBlob(BuildStorage& Storage,
- const Oid& BuildId,
- const IoHash& BlobHash,
- uint64_t& OutCompressedSize,
- uint64_t& OutDecompressedSize)
- {
- ZEN_TRACE_CPU("ValidateBlob");
- IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlobHash);
- if (!Payload)
- {
- throw std::runtime_error(fmt::format("Blob {} could not be found", BlobHash));
- }
- return ValidateBlob(std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize);
- }
-
- ChunkBlockDescription ValidateChunkBlock(IoBuffer&& Payload,
- const IoHash& BlobHash,
- uint64_t& OutCompressedSize,
- uint64_t& OutDecompressedSize)
- {
- CompositeBuffer BlockBuffer = ValidateBlob(std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize);
- if (!BlockBuffer)
- {
- throw std::runtime_error(fmt::format("Chunk block blob {} is not compressed using 'None' compression level", BlobHash));
- }
- return GetChunkBlockDescription(BlockBuffer.Flatten(), BlobHash);
- }
-
- struct ValidateStatistics
- {
- uint64_t BuildBlobSize = 0;
- uint64_t BuildPartSize = 0;
- uint64_t ChunkAttachmentCount = 0;
- uint64_t BlockAttachmentCount = 0;
- std::atomic<uint64_t> VerifiedAttachmentCount = 0;
- std::atomic<uint64_t> VerifiedByteCount = 0;
- uint64_t ElapsedWallTimeUS = 0;
- };
-
- void ValidateBuildPart(BuildStorage& Storage,
- const Oid& BuildId,
- Oid BuildPartId,
- const std::string_view BuildPartName,
- ValidateStatistics& ValidateStats,
- DownloadStatistics& DownloadStats)
+ void ValidateBuildPart(BuildStorage& Storage, const Oid& BuildId, Oid BuildPartId, const std::string_view BuildPartName)
{
ZEN_TRACE_CPU("ValidateBuildPart");
ProgressBar::SetLogOperationName(ProgressMode, "Validate Part");
- enum TaskSteps : uint32_t
- {
- FetchBuild,
- FetchBuildPart,
- ValidateBlobs,
- Cleanup,
- StepCount
- };
-
- auto EndProgress =
- MakeGuard([&]() { ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::StepCount, TaskSteps::StepCount); });
-
- Stopwatch Timer;
- auto _ = MakeGuard([&]() {
- if (!IsQuiet)
- {
- ZEN_CONSOLE("Validated build part {}/{} ('{}') in {}",
- BuildId,
- BuildPartId,
- BuildPartName,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- }
- });
-
- ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::FetchBuild, TaskSteps::StepCount);
-
- CbObject Build = Storage.GetBuild(BuildId);
- if (!BuildPartName.empty())
- {
- BuildPartId = Build["parts"sv].AsObjectView()[BuildPartName].AsObjectId();
- if (BuildPartId == Oid::Zero)
- {
- throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", BuildId, BuildPartName));
- }
- }
- ValidateStats.BuildBlobSize = Build.GetSize();
- uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u;
- if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0)
- {
- PreferredMultipartChunkSize = ChunkSize;
- }
-
- ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::FetchBuildPart, TaskSteps::StepCount);
-
- CbObject BuildPart = Storage.GetBuildPart(BuildId, BuildPartId);
- ValidateStats.BuildPartSize = BuildPart.GetSize();
- if (!IsQuiet)
- {
- ZEN_CONSOLE("Validating build part {}/{} ({})", BuildId, BuildPartId, NiceBytes(BuildPart.GetSize()));
- }
- std::vector<IoHash> ChunkAttachments;
- for (CbFieldView LooseFileView : BuildPart["chunkAttachments"sv].AsObjectView()["rawHashes"sv])
- {
- ChunkAttachments.push_back(LooseFileView.AsBinaryAttachment());
- }
- ValidateStats.ChunkAttachmentCount = ChunkAttachments.size();
- std::vector<IoHash> BlockAttachments;
- for (CbFieldView BlocksView : BuildPart["blockAttachments"sv].AsObjectView()["rawHashes"sv])
- {
- BlockAttachments.push_back(BlocksView.AsBinaryAttachment());
- }
- ValidateStats.BlockAttachmentCount = BlockAttachments.size();
-
- std::vector<ChunkBlockDescription> VerifyBlockDescriptions =
- ParseChunkBlockDescriptionList(Storage.GetBlockMetadatas(BuildId, BlockAttachments));
- if (VerifyBlockDescriptions.size() != BlockAttachments.size())
- {
- throw std::runtime_error(fmt::format("Uploaded blocks metadata could not all be found, {} blocks metadata is missing",
- BlockAttachments.size() - VerifyBlockDescriptions.size()));
- }
-
- WorkerThreadPool& NetworkPool = GetNetworkPool();
- WorkerThreadPool& VerifyPool = GetIOWorkerPool();
- ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
-
- const std::filesystem::path TempFolder = ".zen-tmp";
-
- CreateDirectories(TempFolder);
- auto __ = MakeGuard([&TempFolder]() {
- if (CleanDirectory(TempFolder, {}))
- {
- std::error_code DummyEc;
- RemoveDir(TempFolder, DummyEc);
- }
- });
-
- ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::ValidateBlobs, TaskSteps::StepCount);
-
- ProgressBar ProgressBar(ProgressMode, "Validate Blobs");
-
- uint64_t AttachmentsToVerifyCount = ChunkAttachments.size() + BlockAttachments.size();
- FilteredRate FilteredDownloadedBytesPerSecond;
- FilteredRate FilteredVerifiedBytesPerSecond;
-
- std::atomic<uint64_t> MultipartAttachmentCount = 0;
-
- for (const IoHash& ChunkAttachment : ChunkAttachments)
- {
- Work.ScheduleWork(NetworkPool,
- [&Storage,
- &NetworkPool,
- &VerifyPool,
- &Work,
- &DownloadStats,
- &ValidateStats,
- AttachmentsToVerifyCount,
- &TempFolder,
- BuildId = Oid(BuildId),
- PreferredMultipartChunkSize,
- &FilteredDownloadedBytesPerSecond,
- &FilteredVerifiedBytesPerSecond,
- ChunkAttachment](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("ValidateBuildPart_GetChunk");
-
- FilteredDownloadedBytesPerSecond.Start();
- DownloadLargeBlob(
- Storage,
- TempFolder,
- BuildId,
- ChunkAttachment,
- PreferredMultipartChunkSize,
- Work,
- NetworkPool,
- DownloadStats,
- [&Work,
- &VerifyPool,
- &DownloadStats,
- &ValidateStats,
- AttachmentsToVerifyCount,
- &FilteredDownloadedBytesPerSecond,
- &FilteredVerifiedBytesPerSecond,
- ChunkHash = ChunkAttachment](IoBuffer&& Payload) {
- Payload.SetContentType(ZenContentType::kCompressedBinary);
- if (!AbortFlag)
- {
- Work.ScheduleWork(
- VerifyPool,
- [&DownloadStats,
- &ValidateStats,
- AttachmentsToVerifyCount,
- &FilteredDownloadedBytesPerSecond,
- &FilteredVerifiedBytesPerSecond,
- Payload = std::move(Payload),
- ChunkHash](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("ValidateBuildPart_Validate");
-
- if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount ==
- AttachmentsToVerifyCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
-
- FilteredVerifiedBytesPerSecond.Start();
-
- uint64_t CompressedSize;
- uint64_t DecompressedSize;
- ValidateBlob(std::move(Payload), ChunkHash, CompressedSize, DecompressedSize);
- ValidateStats.VerifiedAttachmentCount++;
- ValidateStats.VerifiedByteCount += DecompressedSize;
- if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
- {
- FilteredVerifiedBytesPerSecond.Stop();
- }
- }
- });
- }
- });
- }
- });
- }
-
- for (const IoHash& BlockAttachment : BlockAttachments)
- {
- Work.ScheduleWork(
- NetworkPool,
- [&Storage,
- &BuildId,
- &Work,
- &VerifyPool,
- &DownloadStats,
- &ValidateStats,
- AttachmentsToVerifyCount,
- &FilteredDownloadedBytesPerSecond,
- &FilteredVerifiedBytesPerSecond,
- BlockAttachment](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("ValidateBuildPart_GetBlock");
-
- FilteredDownloadedBytesPerSecond.Start();
- IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlockAttachment);
- DownloadStats.DownloadedBlockCount++;
- DownloadStats.DownloadedBlockByteCount += Payload.GetSize();
- if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- if (!Payload)
- {
- throw std::runtime_error(fmt::format("Block attachment {} could not be found", BlockAttachment));
- }
- if (!AbortFlag)
- {
- Work.ScheduleWork(
- VerifyPool,
- [&FilteredVerifiedBytesPerSecond,
- AttachmentsToVerifyCount,
- &ValidateStats,
- Payload = std::move(Payload),
- BlockAttachment](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("ValidateBuildPart_ValidateBlock");
-
- FilteredVerifiedBytesPerSecond.Start();
-
- uint64_t CompressedSize;
- uint64_t DecompressedSize;
- ValidateChunkBlock(std::move(Payload), BlockAttachment, CompressedSize, DecompressedSize);
- ValidateStats.VerifiedAttachmentCount++;
- ValidateStats.VerifiedByteCount += DecompressedSize;
- if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
- {
- FilteredVerifiedBytesPerSecond.Stop();
- }
- }
- });
- }
- }
- });
- }
-
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(PendingWork);
-
- const uint64_t DownloadedAttachmentCount = DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount;
- const uint64_t DownloadedByteCount = DownloadStats.DownloadedChunkByteCount + DownloadStats.DownloadedBlockByteCount;
-
- FilteredDownloadedBytesPerSecond.Update(DownloadedByteCount);
- FilteredVerifiedBytesPerSecond.Update(ValidateStats.VerifiedByteCount);
-
- std::string Details = fmt::format("Downloaded {}/{} ({}, {}bits/s). Verified {}/{} ({}, {}B/s)",
- DownloadedAttachmentCount,
- AttachmentsToVerifyCount,
- NiceBytes(DownloadedByteCount),
- NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8),
- ValidateStats.VerifiedAttachmentCount.load(),
- AttachmentsToVerifyCount,
- NiceBytes(ValidateStats.VerifiedByteCount.load()),
- NiceNum(FilteredVerifiedBytesPerSecond.GetCurrent()));
-
- ProgressBar.UpdateState(
- {.Task = "Validating blobs ",
- .Details = Details,
- .TotalCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2),
- .RemainingCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2 -
- (DownloadedAttachmentCount + ValidateStats.VerifiedAttachmentCount.load())),
- .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
- false);
- });
-
- ProgressBar.Finish();
- ValidateStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs();
-
- ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::Cleanup, TaskSteps::StepCount);
+ ConsoleOpLogOutput Output(ProgressMode);
+
+ BuildsOperationValidateBuildPart ValidateOp(Output,
+ Storage,
+ AbortFlag,
+ PauseFlag,
+ GetIOWorkerPool(),
+ GetNetworkPool(),
+ BuildId,
+ BuildPartId,
+ BuildPartName,
+ BuildsOperationValidateBuildPart::Options{.IsQuiet = IsQuiet, .IsVerbose = IsVerbose});
+
+ ValidateOp.Execute();
+
+ const uint64_t DownloadedCount = ValidateOp.m_DownloadStats.DownloadedChunkCount + ValidateOp.m_DownloadStats.DownloadedBlockCount;
+ const uint64_t DownloadedByteCount =
+ ValidateOp.m_DownloadStats.DownloadedChunkByteCount + ValidateOp.m_DownloadStats.DownloadedBlockByteCount;
+ ZEN_CONSOLE("Verified: {:>8} ({}), {}B/sec, {}",
+ DownloadedCount,
+ NiceBytes(DownloadedByteCount),
+ NiceNum(GetBytesPerSecond(ValidateOp.m_ValidateStats.ElapsedWallTimeUS, DownloadedByteCount)),
+ NiceTimeSpanMs(ValidateOp.m_ValidateStats.ElapsedWallTimeUS / 1000));
}
void UploadFolder(StorageInstance& Storage,
@@ -3096,10 +2672,6 @@ namespace {
const std::filesystem::path ZenTempFolder = ZenTempFolderPath(Options.ZenFolderPath);
CreateDirectories(ZenTempFolder);
- CreateDirectories(ZenTempBlockFolderPath(Options.ZenFolderPath));
- CreateDirectories(ZenTempCacheFolderPath(Options.ZenFolderPath));
- CreateDirectories(ZenTempDownloadFolderPath(Options.ZenFolderPath));
-
std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u;
std::vector<std::pair<Oid, std::string>> AllBuildParts =
@@ -4976,22 +4548,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
if (m_PostUploadVerify)
{
- ValidateStatistics ValidateStats;
- DownloadStatistics ValidateDownloadStats;
- ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, ValidateDownloadStats);
- std::string ValidateInfo;
- if (m_PostUploadVerify)
- {
- const uint64_t DownloadedCount =
- ValidateDownloadStats.DownloadedChunkCount + ValidateDownloadStats.DownloadedBlockCount;
- const uint64_t DownloadedByteCount =
- ValidateDownloadStats.DownloadedChunkByteCount + ValidateDownloadStats.DownloadedBlockByteCount;
- ValidateInfo = fmt::format("Verified: {:>8} ({}), {}B/sec, {}",
- DownloadedCount,
- NiceBytes(DownloadedByteCount),
- NiceNum(GetBytesPerSecond(ValidateStats.ElapsedWallTimeUS, DownloadedByteCount)),
- NiceTimeSpanMs(ValidateStats.ElapsedWallTimeUS / 1000));
- }
+ ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName);
}
}
@@ -5208,7 +4765,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
uint64_t CompressedSize;
uint64_t DecompressedSize;
- ValidateBlob(*Storage.BuildStorage, BuildId, BlobHash, CompressedSize, DecompressedSize);
+ ValidateBlob(AbortFlag, *Storage.BuildStorage, BuildId, BlobHash, CompressedSize, DecompressedSize);
if (AbortFlag)
{
throw std::runtime_error("Fetch blob aborted");
@@ -5267,9 +4824,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
const Oid BuildPartId = m_BuildPartName.empty() ? Oid::Zero : ParseBuildPartId();
- ValidateStatistics ValidateStats;
- DownloadStatistics DownloadStats;
- ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, DownloadStats);
+ ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName);
if (AbortFlag)
{
@@ -5485,11 +5040,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw std::runtime_error("Test aborted. (Upload build)");
}
- {
- ValidateStatistics ValidateStats;
- DownloadStatistics ValidateDownloadStats;
- ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, ValidateDownloadStats);
- }
+ ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName);
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
DownloadFolder(Storage,
@@ -5676,11 +5227,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw std::runtime_error("Test aborted. (Upload scrambled)");
}
- {
- ValidateStatistics ValidateStats;
- DownloadStatistics ValidateDownloadStats;
- ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, ValidateDownloadStats);
- }
+ ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName);
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
DownloadFolder(Storage,
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp
index a4f4237cd..60058c4a4 100644
--- a/src/zenremotestore/builds/buildstorageoperations.cpp
+++ b/src/zenremotestore/builds/buildstorageoperations.cpp
@@ -544,6 +544,160 @@ namespace {
return SB.ToString();
}
+ void DownloadLargeBlob(BuildStorage& Storage,
+ const std::filesystem::path& DownloadFolder,
+ const Oid& BuildId,
+ const IoHash& ChunkHash,
+ const std::uint64_t PreferredMultipartChunkSize,
+ ParallelWork& Work,
+ WorkerThreadPool& NetworkPool,
+ DownloadStatistics& DownloadStats,
+ std::function<void(IoBuffer&& Payload)>&& OnDownloadComplete)
+ {
+ ZEN_TRACE_CPU("DownloadLargeBlob");
+
+ struct WorkloadData
+ {
+ TemporaryFile TempFile;
+ };
+ std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
+
+ std::error_code Ec;
+ Workload->TempFile.CreateTemporary(DownloadFolder, Ec);
+ if (Ec)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed opening temporary file '{}': {} ({})", Workload->TempFile.GetPath(), Ec.message(), Ec.value()));
+ }
+ std::vector<std::function<void()>> WorkItems = Storage.GetLargeBuildBlob(
+ BuildId,
+ ChunkHash,
+ PreferredMultipartChunkSize,
+ [&Work, Workload, &DownloadStats](uint64_t Offset, const IoBuffer& Chunk) {
+ DownloadStats.DownloadedChunkByteCount += Chunk.GetSize();
+
+ if (!Work.IsAborted())
+ {
+ ZEN_TRACE_CPU("Async_DownloadLargeBlob_OnReceive");
+ Workload->TempFile.Write(Chunk.GetView(), Offset);
+ }
+ },
+ [&Work, Workload, &DownloadStats, OnDownloadComplete = std::move(OnDownloadComplete)]() {
+ DownloadStats.DownloadedChunkCount++;
+ if (!Work.IsAborted())
+ {
+ ZEN_TRACE_CPU("Async_DownloadLargeBlob_OnComplete");
+
+ uint64_t PayloadSize = Workload->TempFile.FileSize();
+ void* FileHandle = Workload->TempFile.Detach();
+ ZEN_ASSERT(FileHandle != nullptr);
+ IoBuffer Payload(IoBuffer::File, FileHandle, 0, PayloadSize, true);
+ Payload.SetDeleteOnClose(true);
+ OnDownloadComplete(std::move(Payload));
+ }
+ });
+ if (!WorkItems.empty())
+ {
+ DownloadStats.MultipartAttachmentCount++;
+ }
+ for (auto& WorkItem : WorkItems)
+ {
+ Work.ScheduleWork(NetworkPool, [WorkItem = std::move(WorkItem)](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_DownloadLargeBlob_Work");
+
+ WorkItem();
+ }
+ });
+ }
+ }
+
+ CompositeBuffer ValidateBlob(std::atomic<bool>& AbortFlag,
+ IoBuffer&& Payload,
+ const IoHash& BlobHash,
+ uint64_t& OutCompressedSize,
+ uint64_t& OutDecompressedSize)
+ {
+ ZEN_TRACE_CPU("ValidateBlob");
+
+ if (Payload.GetContentType() != ZenContentType::kCompressedBinary)
+ {
+ throw std::runtime_error(fmt::format("Blob {} ({} bytes) has unexpected content type '{}'",
+ BlobHash,
+ Payload.GetSize(),
+ ToString(Payload.GetContentType())));
+ }
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize);
+ if (!Compressed)
+ {
+ throw std::runtime_error(fmt::format("Blob {} ({} bytes) compressed header is invalid", BlobHash, Payload.GetSize()));
+ }
+ if (RawHash != BlobHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Blob {} ({} bytes) compressed header has a mismatching raw hash {}", BlobHash, Payload.GetSize(), RawHash));
+ }
+
+ IoHashStream Hash;
+ bool CouldDecompress = Compressed.DecompressToStream(
+ 0,
+ RawSize,
+ [&AbortFlag, &Hash](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset, SourceSize, Offset);
+ if (!AbortFlag)
+ {
+ for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ {
+ Hash.Append(Segment.GetView());
+ }
+ return true;
+ }
+ return false;
+ });
+
+ if (AbortFlag)
+ {
+ return CompositeBuffer{};
+ }
+
+ if (!CouldDecompress)
+ {
+ throw std::runtime_error(
+ fmt::format("Blob {} ({} bytes) failed to decompress - header information mismatch", BlobHash, Payload.GetSize()));
+ }
+ IoHash ValidateRawHash = Hash.GetHash();
+ if (ValidateRawHash != BlobHash)
+ {
+ throw std::runtime_error(fmt::format("Blob {} ({} bytes) decompressed hash {} does not match header information",
+ BlobHash,
+ Payload.GetSize(),
+ ValidateRawHash));
+ }
+ OodleCompressor Compressor;
+ OodleCompressionLevel CompressionLevel;
+ uint64_t BlockSize;
+ if (!Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
+ {
+ throw std::runtime_error(fmt::format("Blob {} ({} bytes) failed to get compression details", BlobHash, Payload.GetSize()));
+ }
+ OutCompressedSize = Payload.GetSize();
+ OutDecompressedSize = RawSize;
+ if (CompressionLevel == OodleCompressionLevel::None)
+ {
+ // Only decompress to composite if we need it for block verification
+ CompositeBuffer DecompressedComposite = Compressed.DecompressToComposite();
+ if (!DecompressedComposite)
+ {
+ throw std::runtime_error(fmt::format("Blob {} ({} bytes) failed to decompress to composite", BlobHash, Payload.GetSize()));
+ }
+ return DecompressedComposite;
+ }
+ return CompositeBuffer{};
+ }
+
} // namespace
class ReadFileCache
@@ -1264,6 +1418,8 @@ BuildsOperationUpdateFolder::BuildsOperationUpdateFolder(BuildOpLogOutput&
, m_LooseChunkHashes(LooseChunkHashes)
, m_Options(Options)
, m_CacheFolderPath(ZenTempCacheFolderPath(m_Options.ZenFolderPath))
+, m_TempDownloadFolderPath(ZenTempDownloadFolderPath(m_Options.ZenFolderPath))
+, m_TempBlockFolderPath(ZenTempBlockFolderPath(m_Options.ZenFolderPath))
{
}
@@ -1288,6 +1444,10 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
m_LogOutput.SetLogOperationProgress(TaskSteps::ScanExistingData, TaskSteps::StepCount);
+ CreateDirectories(m_CacheFolderPath);
+ CreateDirectories(m_TempDownloadFolderPath);
+ CreateDirectories(m_TempBlockFolderPath);
+
Stopwatch IndexTimer;
if (!m_Options.IsQuiet)
@@ -1836,8 +1996,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
{
TotalPartWriteCount++;
- std::filesystem::path BlockPath =
- ZenTempBlockFolderPath(m_Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
+ std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString();
if (IsFile(BlockPath))
{
CachedChunkBlockIndexes.push_back(BlockIndex);
@@ -2279,7 +2438,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
m_Options.LargeAttachmentSize)
{
DownloadLargeBlob(*m_Storage.BuildStorage,
- ZenTempDownloadFolderPath(m_Options.ZenFolderPath),
+ m_TempDownloadFolderPath,
m_BuildId,
ChunkHash,
m_Options.PreferredMultipartChunkSize,
@@ -2469,7 +2628,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
FilteredWrittenBytesPerSecond.Start();
std::filesystem::path BlockChunkPath =
- ZenTempBlockFolderPath(m_Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
+ m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString();
IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
if (!BlockBuffer)
{
@@ -2584,12 +2743,11 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
if (!Ec)
{
BlockBuffer.SetDeleteOnClose(false);
- BlockBuffer = {};
- BlockChunkPath =
- ZenTempBlockFolderPath(m_Options.ZenFolderPath) / fmt::format("{}_{:x}_{:x}",
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeLength);
+ BlockBuffer = {};
+ BlockChunkPath = m_TempBlockFolderPath / fmt::format("{}_{:x}_{:x}",
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
RenameFile(TempBlobPath, BlockChunkPath, Ec);
if (Ec)
{
@@ -2609,7 +2767,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
ZEN_TRACE_CPU("WriteTempPartialBlock");
// Could not be moved and rather large, lets store it on disk
BlockChunkPath =
- ZenTempBlockFolderPath(m_Options.ZenFolderPath) /
+ m_TempBlockFolderPath /
fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength);
TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
BlockBuffer = {};
@@ -2773,9 +2931,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
if (!Ec)
{
BlockBuffer.SetDeleteOnClose(false);
- BlockBuffer = {};
- BlockChunkPath =
- ZenTempBlockFolderPath(m_Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
+ BlockBuffer = {};
+ BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString();
RenameFile(TempBlobPath, BlockChunkPath, Ec);
if (Ec)
{
@@ -2794,8 +2951,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
{
ZEN_TRACE_CPU("WriteTempFullBlock");
// Could not be moved and rather large, lets store it on disk
- BlockChunkPath =
- ZenTempBlockFolderPath(m_Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
+ BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString();
TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
BlockBuffer = {};
}
@@ -3540,7 +3696,7 @@ BuildsOperationUpdateFolder::ScanTempBlocksFolder(tsl::robin_map<IoHash, uint32_
}
DirectoryContent BlockDirContent;
- GetDirectoryContent(ZenTempBlockFolderPath(m_Options.ZenFolderPath),
+ GetDirectoryContent(m_TempBlockFolderPath,
DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes,
BlockDirContent);
OutCachedBlocksFound.reserve(BlockDirContent.Files.size());
@@ -3792,7 +3948,7 @@ BuildsOperationUpdateFolder::FindDownloadedChunk(const IoHash& ChunkHash)
{
ZEN_TRACE_CPU("FindDownloadedChunk");
- std::filesystem::path CompressedChunkPath = ZenTempDownloadFolderPath(m_Options.ZenFolderPath) / ChunkHash.ToHexString();
+ std::filesystem::path CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString();
if (IsFile(CompressedChunkPath))
{
IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
@@ -4590,7 +4746,7 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa
{
Payload.SetDeleteOnClose(false);
Payload = {};
- CompressedChunkPath = ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString();
+ CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString();
RenameFile(TempBlobPath, CompressedChunkPath, Ec);
if (Ec)
{
@@ -4609,7 +4765,7 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa
{
ZEN_TRACE_CPU("WriteTempChunk");
// Could not be moved and rather large, lets store it on disk
- CompressedChunkPath = ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString();
+ CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString();
TemporaryFile::SafeWriteFile(CompressedChunkPath, Payload);
Payload = {};
}
@@ -4929,72 +5085,6 @@ BuildsOperationUploadFolder::Execute()
ChunkedFolderContent LocalContent;
{
- auto IsAcceptedFolder = [this](const std::string_view& RelativePath) -> bool {
- for (const std::string& ExcludeFolder : m_Options.ExcludeFolders)
- {
- if (RelativePath.starts_with(ExcludeFolder))
- {
- if (RelativePath.length() == ExcludeFolder.length())
- {
- return false;
- }
- else if (RelativePath[ExcludeFolder.length()] == '/')
- {
- return false;
- }
- }
- }
- return true;
- };
-
- auto IsAcceptedFile = [this](const std::string_view& RelativePath, uint64_t, uint32_t) -> bool {
- for (const std::string& ExcludeExtension : m_Options.ExcludeExtensions)
- {
- if (RelativePath.ends_with(ExcludeExtension))
- {
- return false;
- }
- }
- return true;
- };
-
- auto ParseManifest = [](const std::filesystem::path& Path,
- const std::filesystem::path& ManifestPath) -> std::vector<std::filesystem::path> {
- std::vector<std::filesystem::path> AssetPaths;
- std::filesystem::path AbsoluteManifestPath =
- MakeSafeAbsolutePath(ManifestPath.is_absolute() ? ManifestPath : Path / ManifestPath);
- IoBuffer ManifestContent = ReadFile(AbsoluteManifestPath).Flatten();
- std::string_view ManifestString((const char*)ManifestContent.GetView().GetData(), ManifestContent.GetSize());
- std::string_view::size_type Offset = 0;
- while (Offset < ManifestContent.GetSize())
- {
- size_t PathBreakOffset = ManifestString.find_first_of("\t\r\n", Offset);
- if (PathBreakOffset == std::string_view::npos)
- {
- PathBreakOffset = ManifestContent.GetSize();
- }
- std::string_view AssetPath = ManifestString.substr(Offset, PathBreakOffset - Offset);
- if (!AssetPath.empty())
- {
- AssetPaths.emplace_back(std::filesystem::path(AssetPath));
- }
- Offset = PathBreakOffset;
- size_t EolOffset = ManifestString.find_first_of("\r\n", Offset);
- if (EolOffset == std::string_view::npos)
- {
- break;
- }
- Offset = EolOffset;
- size_t LineBreakOffset = ManifestString.find_first_not_of("\t\r\n", Offset);
- if (LineBreakOffset == std::string_view::npos)
- {
- break;
- }
- Offset = LineBreakOffset;
- }
- return AssetPaths;
- };
-
Stopwatch ScanTimer;
FolderContent Content;
if (m_ManifestPath.empty())
@@ -5013,15 +5103,10 @@ BuildsOperationUploadFolder::Execute()
Content = GetFolderContent(
m_LocalFolderScanStats,
m_Path,
- std::move(IsAcceptedFolder),
- [this, &IsAcceptedFile, &ExcludeAssetPaths](const std::string_view& RelativePath,
- uint64_t Size,
- uint32_t Attributes) -> bool {
- if (RelativePath == m_Options.ZenExcludeManifestName)
- {
- return false;
- }
- if (!IsAcceptedFile(RelativePath, Size, Attributes))
+ [this](const std::string_view& RelativePath) { return IsAcceptedFolder(RelativePath); },
+ [this, &ExcludeAssetPaths](const std::string_view& RelativePath, uint64_t Size, uint32_t Attributes) -> bool {
+ ZEN_UNUSED(Size, Attributes);
+ if (!IsAcceptedFile(RelativePath))
{
return false;
}
@@ -5342,7 +5427,7 @@ BuildsOperationUploadFolder::Execute()
}
for (const ChunkBlockDescription& Block : AllChunkBlockDescriptions)
{
- for (const IoHash& ChunkHash : Block.ChunkHashes)
+ for (const IoHash& ChunkHash : Block.ChunkRawHashes)
{
ChunkHashToAbsoluteChunkIndex.insert({ChunkHash, AbsoluteChunkHashes.size()});
AbsoluteChunkHashes.push_back(ChunkHash);
@@ -5638,24 +5723,6 @@ BuildsOperationUploadFolder::Execute()
}
}
- // if (m_PostUploadVerify && !m_AbortFlag)
- // {
- // //m_LogOutput.SetLogOperationProgress(TaskSteps::Validate, TaskSteps::StepCount);
- // ValidateBuildPart(*m_Storage.BuildStorage, m_BuildId, m_BuildPartId, m_BuildPartName, ValidateStats, ValidateDownloadStats);
- // std::string ValidateInfo;
- // if (m_PostUploadVerify)
- // {
- // const uint64_t DownloadedCount = ValidateDownloadStats.DownloadedChunkCount + ValidateDownloadStats.DownloadedBlockCount;
- // const uint64_t DownloadedByteCount =
- // ValidateDownloadStats.DownloadedChunkByteCount + ValidateDownloadStats.DownloadedBlockByteCount;
- // ValidateInfo = fmt::format("\n Verified: {:>8} ({}), {}B/sec, {}",
- // DownloadedCount,
- // NiceBytes(DownloadedByteCount),
- // NiceNum(GetBytesPerSecond(ValidateStats.ElapsedWallTimeUS, DownloadedByteCount)),
- // NiceTimeSpanMs(ValidateStats.ElapsedWallTimeUS / 1000));
- // }
- // }
-
m_Storage.BuildStorage->PutBuildPartStats(
m_BuildId,
m_BuildPartId,
@@ -5674,74 +5741,78 @@ BuildsOperationUploadFolder::Execute()
m_LogOutput.SetLogOperationProgress(TaskSteps::Cleanup, TaskSteps::StepCount);
}
-void
-DownloadLargeBlob(BuildStorage& Storage,
- const std::filesystem::path& DownloadFolder,
- const Oid& BuildId,
- const IoHash& ChunkHash,
- const std::uint64_t PreferredMultipartChunkSize,
- ParallelWork& Work,
- WorkerThreadPool& NetworkPool,
- DownloadStatistics& DownloadStats,
- std::function<void(IoBuffer&& Payload)>&& OnDownloadComplete)
+std::vector<std::filesystem::path>
+BuildsOperationUploadFolder::ParseManifest(const std::filesystem::path& Path, const std::filesystem::path& ManifestPath)
{
- ZEN_TRACE_CPU("DownloadLargeBlob");
-
- struct WorkloadData
- {
- TemporaryFile TempFile;
- };
- std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
-
- std::error_code Ec;
- Workload->TempFile.CreateTemporary(DownloadFolder, Ec);
- if (Ec)
+ std::vector<std::filesystem::path> AssetPaths;
+ std::filesystem::path AbsoluteManifestPath = MakeSafeAbsolutePath(ManifestPath.is_absolute() ? ManifestPath : Path / ManifestPath);
+ IoBuffer ManifestContent = ReadFile(AbsoluteManifestPath).Flatten();
+ std::string_view ManifestString((const char*)ManifestContent.GetView().GetData(), ManifestContent.GetSize());
+ std::string_view::size_type Offset = 0;
+ while (Offset < ManifestContent.GetSize())
{
- throw std::runtime_error(
- fmt::format("Failed opening temporary file '{}': {} ({})", Workload->TempFile.GetPath(), Ec.message(), Ec.value()));
+ size_t PathBreakOffset = ManifestString.find_first_of("\t\r\n", Offset);
+ if (PathBreakOffset == std::string_view::npos)
+ {
+ PathBreakOffset = ManifestContent.GetSize();
+ }
+ std::string_view AssetPath = ManifestString.substr(Offset, PathBreakOffset - Offset);
+ if (!AssetPath.empty())
+ {
+ AssetPaths.emplace_back(std::filesystem::path(AssetPath));
+ }
+ Offset = PathBreakOffset;
+ size_t EolOffset = ManifestString.find_first_of("\r\n", Offset);
+ if (EolOffset == std::string_view::npos)
+ {
+ break;
+ }
+ Offset = EolOffset;
+ size_t LineBreakOffset = ManifestString.find_first_not_of("\t\r\n", Offset);
+ if (LineBreakOffset == std::string_view::npos)
+ {
+ break;
+ }
+ Offset = LineBreakOffset;
}
- std::vector<std::function<void()>> WorkItems = Storage.GetLargeBuildBlob(
- BuildId,
- ChunkHash,
- PreferredMultipartChunkSize,
- [&Work, Workload, &DownloadStats](uint64_t Offset, const IoBuffer& Chunk) {
- DownloadStats.DownloadedChunkByteCount += Chunk.GetSize();
+ return AssetPaths;
+}
- if (!Work.IsAborted())
+bool
+BuildsOperationUploadFolder::IsAcceptedFolder(const std::string_view& RelativePath) const
+{
+ for (const std::string& ExcludeFolder : m_Options.ExcludeFolders)
+ {
+ if (RelativePath.starts_with(ExcludeFolder))
+ {
+ if (RelativePath.length() == ExcludeFolder.length())
{
- ZEN_TRACE_CPU("Async_DownloadLargeBlob_OnReceive");
- Workload->TempFile.Write(Chunk.GetView(), Offset);
+ return false;
}
- },
- [&Work, Workload, &DownloadStats, OnDownloadComplete = std::move(OnDownloadComplete)]() {
- DownloadStats.DownloadedChunkCount++;
- if (!Work.IsAborted())
+ else if (RelativePath[ExcludeFolder.length()] == '/')
{
- ZEN_TRACE_CPU("Async_DownloadLargeBlob_OnComplete");
-
- uint64_t PayloadSize = Workload->TempFile.FileSize();
- void* FileHandle = Workload->TempFile.Detach();
- ZEN_ASSERT(FileHandle != nullptr);
- IoBuffer Payload(IoBuffer::File, FileHandle, 0, PayloadSize, true);
- Payload.SetDeleteOnClose(true);
- OnDownloadComplete(std::move(Payload));
+ return false;
}
- });
- if (!WorkItems.empty())
+ }
+ }
+ return true;
+}
+
+bool
+BuildsOperationUploadFolder::IsAcceptedFile(const std::string_view& RelativePath) const
+{
+ if (RelativePath == m_Options.ZenExcludeManifestName)
{
- DownloadStats.MultipartAttachmentCount++;
+ return false;
}
- for (auto& WorkItem : WorkItems)
+ for (const std::string& ExcludeExtension : m_Options.ExcludeExtensions)
{
- Work.ScheduleWork(NetworkPool, [WorkItem = std::move(WorkItem)](std::atomic<bool>& AbortFlag) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("Async_DownloadLargeBlob_Work");
-
- WorkItem();
- }
- });
+ if (RelativePath.ends_with(ExcludeExtension))
+ {
+ return false;
+ }
}
+ return true;
}
std::vector<size_t>
@@ -7130,4 +7201,317 @@ BuildsOperationUploadFolder::CompressChunk(const ChunkedFolderContent& Content,
return std::move(CompressedBlob).GetCompressed();
}
+BuildsOperationValidateBuildPart::BuildsOperationValidateBuildPart(BuildOpLogOutput& LogOutput,
+ BuildStorage& Storage,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ WorkerThreadPool& IOWorkerPool,
+ WorkerThreadPool& NetworkPool,
+ const Oid& BuildId,
+ const Oid& BuildPartId,
+ const std::string_view BuildPartName,
+ const Options& Options)
+
+: m_LogOutput(LogOutput)
+, m_Storage(Storage)
+, m_AbortFlag(AbortFlag)
+, m_PauseFlag(PauseFlag)
+, m_IOWorkerPool(IOWorkerPool)
+, m_NetworkPool(NetworkPool)
+, m_BuildId(BuildId)
+, m_BuildPartId(BuildPartId)
+, m_BuildPartName(BuildPartName)
+, m_Options(Options)
+{
+}
+
+void
+BuildsOperationValidateBuildPart::Execute()
+{
+ ZEN_TRACE_CPU("ValidateBuildPart");
+
+ enum TaskSteps : uint32_t
+ {
+ FetchBuild,
+ FetchBuildPart,
+ ValidateBlobs,
+ Cleanup,
+ StepCount
+ };
+
+ auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress(TaskSteps::StepCount, TaskSteps::StepCount); });
+
+ Stopwatch Timer;
+ auto _ = MakeGuard([&]() {
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_CONSOLE("Validated build part {}/{} ('{}') in {}",
+ m_BuildId,
+ m_BuildPartId,
+ m_BuildPartName,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ });
+
+ m_LogOutput.SetLogOperationProgress(TaskSteps::FetchBuild, TaskSteps::StepCount);
+
+ CbObject Build = m_Storage.GetBuild(m_BuildId);
+ if (!m_BuildPartName.empty())
+ {
+ m_BuildPartId = Build["parts"sv].AsObjectView()[m_BuildPartName].AsObjectId();
+ if (m_BuildPartId == Oid::Zero)
+ {
+ throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", m_BuildId, m_BuildPartName));
+ }
+ }
+ m_ValidateStats.BuildBlobSize = Build.GetSize();
+ uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u;
+ if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0)
+ {
+ PreferredMultipartChunkSize = ChunkSize;
+ }
+
+ m_LogOutput.SetLogOperationProgress(TaskSteps::FetchBuildPart, TaskSteps::StepCount);
+
+ CbObject BuildPart = m_Storage.GetBuildPart(m_BuildId, m_BuildPartId);
+ m_ValidateStats.BuildPartSize = BuildPart.GetSize();
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_CONSOLE("Validating build part {}/{} ({})", m_BuildId, m_BuildPartId, NiceBytes(BuildPart.GetSize()));
+ }
+ std::vector<IoHash> ChunkAttachments;
+ for (CbFieldView LooseFileView : BuildPart["chunkAttachments"sv].AsObjectView()["rawHashes"sv])
+ {
+ ChunkAttachments.push_back(LooseFileView.AsBinaryAttachment());
+ }
+ m_ValidateStats.ChunkAttachmentCount = ChunkAttachments.size();
+ std::vector<IoHash> BlockAttachments;
+ for (CbFieldView BlocksView : BuildPart["blockAttachments"sv].AsObjectView()["rawHashes"sv])
+ {
+ BlockAttachments.push_back(BlocksView.AsBinaryAttachment());
+ }
+ m_ValidateStats.BlockAttachmentCount = BlockAttachments.size();
+
+ std::vector<ChunkBlockDescription> VerifyBlockDescriptions =
+ ParseChunkBlockDescriptionList(m_Storage.GetBlockMetadatas(m_BuildId, BlockAttachments));
+ if (VerifyBlockDescriptions.size() != BlockAttachments.size())
+ {
+ throw std::runtime_error(fmt::format("Uploaded blocks metadata could not all be found, {} blocks metadata is missing",
+ BlockAttachments.size() - VerifyBlockDescriptions.size()));
+ }
+
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ const std::filesystem::path TempFolder = ".zen-tmp";
+
+ CreateDirectories(TempFolder);
+ auto __ = MakeGuard([&TempFolder]() {
+ if (CleanDirectory(TempFolder, {}))
+ {
+ std::error_code DummyEc;
+ RemoveDir(TempFolder, DummyEc);
+ }
+ });
+
+ m_LogOutput.SetLogOperationProgress(TaskSteps::ValidateBlobs, TaskSteps::StepCount);
+
+ std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Validate Blobs"));
+ BuildOpLogOutput::ProgressBar& Progress(*ProgressBarPtr);
+
+ uint64_t AttachmentsToVerifyCount = ChunkAttachments.size() + BlockAttachments.size();
+ FilteredRate FilteredDownloadedBytesPerSecond;
+ FilteredRate FilteredVerifiedBytesPerSecond;
+
+ std::atomic<uint64_t> MultipartAttachmentCount = 0;
+
+ for (const IoHash& ChunkAttachment : ChunkAttachments)
+ {
+ Work.ScheduleWork(
+ m_NetworkPool,
+ [this,
+ &Work,
+ AttachmentsToVerifyCount,
+ &TempFolder,
+ PreferredMultipartChunkSize,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredVerifiedBytesPerSecond,
+ ChunkAttachment](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("ValidateBuildPart_GetChunk");
+
+ FilteredDownloadedBytesPerSecond.Start();
+ DownloadLargeBlob(
+ m_Storage,
+ TempFolder,
+ m_BuildId,
+ ChunkAttachment,
+ PreferredMultipartChunkSize,
+ Work,
+ m_NetworkPool,
+ m_DownloadStats,
+ [this,
+ &Work,
+ AttachmentsToVerifyCount,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredVerifiedBytesPerSecond,
+ ChunkHash = ChunkAttachment](IoBuffer&& Payload) {
+ Payload.SetContentType(ZenContentType::kCompressedBinary);
+ if (!m_AbortFlag)
+ {
+ Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this,
+ AttachmentsToVerifyCount,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredVerifiedBytesPerSecond,
+ Payload = std::move(Payload),
+ ChunkHash](std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("ValidateBuildPart_Validate");
+
+ if (m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount ==
+ AttachmentsToVerifyCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+
+ FilteredVerifiedBytesPerSecond.Start();
+
+ uint64_t CompressedSize;
+ uint64_t DecompressedSize;
+ ValidateBlob(m_AbortFlag, std::move(Payload), ChunkHash, CompressedSize, DecompressedSize);
+ m_ValidateStats.VerifiedAttachmentCount++;
+ m_ValidateStats.VerifiedByteCount += DecompressedSize;
+ if (m_ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
+ {
+ FilteredVerifiedBytesPerSecond.Stop();
+ }
+ }
+ });
+ }
+ });
+ }
+ });
+ }
+
+ for (const IoHash& BlockAttachment : BlockAttachments)
+ {
+ Work.ScheduleWork(
+ m_NetworkPool,
+ [this, &Work, AttachmentsToVerifyCount, &FilteredDownloadedBytesPerSecond, &FilteredVerifiedBytesPerSecond, BlockAttachment](
+ std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("ValidateBuildPart_GetBlock");
+
+ FilteredDownloadedBytesPerSecond.Start();
+ IoBuffer Payload = m_Storage.GetBuildBlob(m_BuildId, BlockAttachment);
+ m_DownloadStats.DownloadedBlockCount++;
+ m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize();
+ if (m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ if (!Payload)
+ {
+ throw std::runtime_error(fmt::format("Block attachment {} could not be found", BlockAttachment));
+ }
+ if (!m_AbortFlag)
+ {
+ Work.ScheduleWork(m_IOWorkerPool,
+ [this,
+ &FilteredVerifiedBytesPerSecond,
+ AttachmentsToVerifyCount,
+ Payload = std::move(Payload),
+ BlockAttachment](std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("ValidateBuildPart_ValidateBlock");
+
+ FilteredVerifiedBytesPerSecond.Start();
+
+ uint64_t CompressedSize;
+ uint64_t DecompressedSize;
+ ValidateChunkBlock(std::move(Payload), BlockAttachment, CompressedSize, DecompressedSize);
+ m_ValidateStats.VerifiedAttachmentCount++;
+ m_ValidateStats.VerifiedByteCount += DecompressedSize;
+ if (m_ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
+ {
+ FilteredVerifiedBytesPerSecond.Stop();
+ }
+ }
+ });
+ }
+ }
+ });
+ }
+
+ Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
+
+ const uint64_t DownloadedAttachmentCount = m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount;
+ const uint64_t DownloadedByteCount = m_DownloadStats.DownloadedChunkByteCount + m_DownloadStats.DownloadedBlockByteCount;
+
+ FilteredDownloadedBytesPerSecond.Update(DownloadedByteCount);
+ FilteredVerifiedBytesPerSecond.Update(m_ValidateStats.VerifiedByteCount);
+
+ std::string Details = fmt::format("Downloaded {}/{} ({}, {}bits/s). Verified {}/{} ({}, {}B/s)",
+ DownloadedAttachmentCount,
+ AttachmentsToVerifyCount,
+ NiceBytes(DownloadedByteCount),
+ NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8),
+ m_ValidateStats.VerifiedAttachmentCount.load(),
+ AttachmentsToVerifyCount,
+ NiceBytes(m_ValidateStats.VerifiedByteCount.load()),
+ NiceNum(FilteredVerifiedBytesPerSecond.GetCurrent()));
+
+ Progress.UpdateState(
+ {.Task = "Validating blobs ",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2),
+ .RemainingCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2 -
+ (DownloadedAttachmentCount + m_ValidateStats.VerifiedAttachmentCount.load())),
+ .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+
+ Progress.Finish();
+ m_ValidateStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs();
+
+ m_LogOutput.SetLogOperationProgress(TaskSteps::Cleanup, TaskSteps::StepCount);
+}
+
+CompositeBuffer
+ValidateBlob(std::atomic<bool>& AbortFlag,
+ BuildStorage& Storage,
+ const Oid& BuildId,
+ const IoHash& BlobHash,
+ uint64_t& OutCompressedSize,
+ uint64_t& OutDecompressedSize)
+{
+ ZEN_TRACE_CPU("ValidateBlob");
+ IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlobHash);
+ if (!Payload)
+ {
+ throw std::runtime_error(fmt::format("Blob {} could not be found", BlobHash));
+ }
+ return ValidateBlob(AbortFlag, std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize);
+}
+
+ChunkBlockDescription
+BuildsOperationValidateBuildPart::ValidateChunkBlock(IoBuffer&& Payload,
+ const IoHash& BlobHash,
+ uint64_t& OutCompressedSize,
+ uint64_t& OutDecompressedSize)
+{
+ CompositeBuffer BlockBuffer = ValidateBlob(m_AbortFlag, std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize);
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Chunk block blob {} is not compressed using 'None' compression level", BlobHash));
+ }
+ return GetChunkBlockDescription(BlockBuffer.Flatten(), BlobHash);
+}
+
} // namespace zen
diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
index 8ba32127a..be6720d5d 100644
--- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
+++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
@@ -374,6 +374,8 @@ private:
const std::vector<IoHash>& m_LooseChunkHashes;
const Options m_Options;
const std::filesystem::path m_CacheFolderPath;
+ const std::filesystem::path m_TempDownloadFolderPath;
+ const std::filesystem::path m_TempBlockFolderPath;
};
struct FindBlocksStatistics
@@ -513,6 +515,11 @@ public:
LooseChunksStatistics m_LooseChunksStats;
private:
+ std::vector<std::filesystem::path> ParseManifest(const std::filesystem::path& Path, const std::filesystem::path& ManifestPath);
+
+ bool IsAcceptedFolder(const std::string_view& RelativePath) const;
+ bool IsAcceptedFile(const std::string_view& RelativePath) const;
+
std::vector<size_t> FindReuseBlocks(const std::vector<ChunkBlockDescription>& KnownBlocks,
std::span<const IoHash> ChunkHashes,
std::span<const uint32_t> ChunkIndexes,
@@ -611,14 +618,64 @@ private:
const Options m_Options;
};
-void DownloadLargeBlob(BuildStorage& Storage,
- const std::filesystem::path& DownloadFolder,
- const Oid& BuildId,
- const IoHash& ChunkHash,
- const std::uint64_t PreferredMultipartChunkSize,
- ParallelWork& Work,
- WorkerThreadPool& NetworkPool,
- DownloadStatistics& DownloadStats,
- std::function<void(IoBuffer&& Payload)>&& OnDownloadComplete);
+struct ValidateStatistics
+{
+ uint64_t BuildBlobSize = 0;
+ uint64_t BuildPartSize = 0;
+ uint64_t ChunkAttachmentCount = 0;
+ uint64_t BlockAttachmentCount = 0;
+ std::atomic<uint64_t> VerifiedAttachmentCount = 0;
+ std::atomic<uint64_t> VerifiedByteCount = 0;
+ uint64_t ElapsedWallTimeUS = 0;
+};
+
+class BuildsOperationValidateBuildPart
+{
+public:
+ struct Options
+ {
+ bool IsQuiet = false;
+ bool IsVerbose = false;
+ };
+ BuildsOperationValidateBuildPart(BuildOpLogOutput& LogOutput,
+ BuildStorage& Storage,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ WorkerThreadPool& IOWorkerPool,
+ WorkerThreadPool& NetworkPool,
+ const Oid& BuildId,
+ const Oid& BuildPartId,
+ const std::string_view BuildPartName,
+ const Options& Options);
+
+ void Execute();
+
+ ValidateStatistics m_ValidateStats;
+ DownloadStatistics m_DownloadStats;
+
+private:
+ ChunkBlockDescription ValidateChunkBlock(IoBuffer&& Payload,
+ const IoHash& BlobHash,
+ uint64_t& OutCompressedSize,
+ uint64_t& OutDecompressedSize);
+
+ BuildOpLogOutput& m_LogOutput;
+ BuildStorage& m_Storage;
+ std::atomic<bool>& m_AbortFlag;
+ std::atomic<bool>& m_PauseFlag;
+ WorkerThreadPool& m_IOWorkerPool;
+ WorkerThreadPool& m_NetworkPool;
+ const Oid m_BuildId;
+ Oid m_BuildPartId;
+ const std::string m_BuildPartName;
+ const Options m_Options;
+};
+
+CompositeBuffer ValidateBlob(std::atomic<bool>& AbortFlag,
+ BuildStorage& Storage,
+ const Oid& BuildId,
+ const IoHash& BlobHash,
+ uint64_t& OutCompressedSize,
+ uint64_t& OutDecompressedSize);
} // namespace zen