aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-10-22 12:25:08 +0200
committerGitHub Enterprise <[email protected]>2025-10-22 12:25:08 +0200
commit2f0298d3874d0e4cb351e6b7f18566f6dc0df388 (patch)
tree664b3d6a2bcb0f7f420a09227cabd1c0b1e5b6a3 /src
parenttweak worker pools for builds (#595) (diff)
downloadzen-2f0298d3874d0e4cb351e6b7f18566f6dc0df388.tar.xz
zen-2f0298d3874d0e4cb351e6b7f18566f6dc0df388.zip
add `zen builds prime-cache` command (#598)
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.cpp178
-rw-r--r--src/zen/cmds/builds_cmd.h7
-rw-r--r--src/zenremotestore/builds/buildstoragecache.cpp4
-rw-r--r--src/zenremotestore/builds/buildstorageoperations.cpp274
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h2
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h42
6 files changed, 481 insertions, 26 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index 1f748400a..343702ade 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -3033,6 +3033,39 @@ BuildsCommand::BuildsCommand()
m_FetchBlobOptions.parse_positional({"build-id", "blob-hash"});
m_FetchBlobOptions.positional_help("build-id blob-hash");
+ // prime-cache
+ AddSystemOptions(m_PrimeCacheOptions);
+ AddCloudOptions(m_PrimeCacheOptions);
+ AddFileOptions(m_PrimeCacheOptions);
+ AddOutputOptions(m_PrimeCacheOptions);
+ AddCacheOptions(m_PrimeCacheOptions);
+ AddZenFolderOptions(m_PrimeCacheOptions);
+ m_PrimeCacheOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>");
+ m_PrimeCacheOptions.add_option(
+ "",
+ "",
+ "build-part-id",
+ "Build part Ids list separated by ',', if no build-part-ids or build-part-names are given all parts will be downloaded",
+ cxxopts::value(m_BuildPartIds),
+ "<id>");
+ m_PrimeCacheOptions.add_option("",
+ "",
+ "build-part-name",
+ "Name of the build parts list separated by ',', if no build-part-ids or build-part-names are given "
+ "all parts will be downloaded",
+ cxxopts::value(m_BuildPartNames),
+ "<name>");
+
+ m_PrimeCacheOptions.add_option("",
+ "",
+ "force",
+ "Force download of all blobs by ignoring any existing blobs in cache",
+ cxxopts::value(m_Force),
+ "<force>");
+
+ m_PrimeCacheOptions.parse_positional({"build-id"});
+ m_PrimeCacheOptions.positional_help("build-id");
+
auto AddZenProcessOptions = [this](cxxopts::Options& Ops) {
Ops.add_option("", "", "process-id", "Process id of running process", cxxopts::value(m_ZenProcessId), "<pid>");
};
@@ -3241,7 +3274,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildStorageCache::Statistics& StorageCacheStats,
const std::filesystem::path& TempPath,
bool RequireNamespace,
- bool RequireBucket) -> StorageInstance {
+ bool RequireBucket,
+ bool BoostCacheBackgroundWorkerPool) -> StorageInstance {
ParseStorageOptions(RequireNamespace, RequireBucket);
StorageInstance Result;
@@ -3420,21 +3454,22 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
if (ZenCacheEndpointTestResult TestResult = TestZenCacheEndpoint(m_ZenCacheHost, false); TestResult.Success)
{
- Result.CacheHttp = std::make_unique<HttpClient>(m_ZenCacheHost,
+ Result.CacheHttp = std::make_unique<HttpClient>(m_ZenCacheHost,
HttpClientSettings{.LogCategory = "httpcacheclient",
- .ConnectTimeout = std::chrono::milliseconds{3000},
- .Timeout = std::chrono::milliseconds{30000},
- .AssumeHttp2 = CacheAssumeHttp2,
- .AllowResume = true,
- .RetryCount = 0},
+ .ConnectTimeout = std::chrono::milliseconds{3000},
+ .Timeout = std::chrono::milliseconds{30000},
+ .AssumeHttp2 = CacheAssumeHttp2,
+ .AllowResume = true,
+ .RetryCount = 0},
[]() { return AbortFlag.load(); });
- Result.BuildCacheStorage = CreateZenBuildStorageCache(
- *Result.CacheHttp,
- StorageCacheStats,
- m_Namespace,
- m_Bucket,
- TempPath / "zencache",
- m_PrimeCacheOnly ? GetSmallWorkerPool(EWorkloadType::Background) : GetTinyWorkerPool(EWorkloadType::Background));
+ Result.BuildCacheStorage =
+ CreateZenBuildStorageCache(*Result.CacheHttp,
+ StorageCacheStats,
+ m_Namespace,
+ m_Bucket,
+ TempPath / "zencache",
+ BoostCacheBackgroundWorkerPool ? GetSmallWorkerPool(EWorkloadType::Background)
+ : GetTinyWorkerPool(EWorkloadType::Background));
CacheDescription = fmt::format("Zen {}{}. SessionId: '{}'",
BuildCacheName.empty() ? "" : fmt::format("{}, ", BuildCacheName),
m_ZenCacheHost,
@@ -3679,7 +3714,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
StorageCacheStats,
ZenTempFolderPath(ZenFolderPath),
/*RequriesNamespace*/ false,
- /*RequireBucket*/ false);
+ /*RequireBucket*/ false,
+ /*BoostCacheBackgroundWorkerPool */ false);
CbObject Response = Storage.BuildStorage->ListNamespaces(m_ListNamespacesRecursive);
ZEN_ASSERT(ValidateCompactBinary(Response.GetView(), CbValidateMode::Default) == CbValidateError::None);
@@ -3769,7 +3805,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
StorageCacheStats,
ZenTempFolderPath(m_ZenFolderPath),
/*RequriesNamespace*/ true,
- /*RequireBucket*/ false);
+ /*RequireBucket*/ false,
+ /*BoostCacheBackgroundWorkerPool */ false);
CbObject Response = Storage.BuildStorage->ListBuilds(QueryObject);
ZEN_ASSERT(ValidateCompactBinary(Response.GetView(), CbValidateMode::Default) == CbValidateError::None);
@@ -3835,7 +3872,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
StorageCacheStats,
ZenTempFolderPath(m_ZenFolderPath),
/*RequriesNamespace*/ true,
- /*RequireBucket*/ true);
+ /*RequireBucket*/ true,
+ /*BoostCacheBackgroundWorkerPool */ false);
const Oid BuildId = ParseBuildId();
@@ -3907,7 +3945,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
StorageCacheStats,
ZenTempFolderPath(m_ZenFolderPath),
/*RequriesNamespace*/ true,
- /*RequireBucket*/ true);
+ /*RequireBucket*/ true,
+ /*BoostCacheBackgroundWorkerPool */ false);
if (m_BuildPartName.empty())
{
@@ -4024,7 +4063,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
StorageCacheStats,
ZenTempFolderPath(m_ZenFolderPath),
/*RequriesNamespace*/ true,
- /*RequireBucket*/ true);
+ /*RequireBucket*/ true,
+ /*BoostCacheBackgroundWorkerPool*/ m_PrimeCacheOnly);
const Oid BuildId = ParseBuildId();
@@ -4102,7 +4142,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
StorageCacheStats,
ZenTempFolderPath(m_ZenFolderPath),
/*RequriesNamespace*/ true,
- /*RequireBucket*/ true);
+ /*RequireBucket*/ true,
+ /*BoostCacheBackgroundWorkerPool */ false);
const Oid BuildId = ParseBuildId();
@@ -4136,6 +4177,91 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
}
+ if (SubOption == &m_PrimeCacheOptions)
+ {
+ if (!IsQuiet)
+ {
+ LogExecutableVersionAndPid();
+ }
+
+ InitializeWorkerPools(m_BoostWorkerThreads);
+
+ BuildStorage::Statistics StorageStats;
+ BuildStorageCache::Statistics StorageCacheStats;
+
+ if (m_ZenFolderPath.empty())
+ {
+ m_ZenFolderPath = std::filesystem::current_path() / ZenFolderName;
+ }
+ MakeSafeAbsolutePathÍnPlace(m_ZenFolderPath);
+
+ CreateDirectories(m_ZenFolderPath);
+ auto _ = MakeGuard([this]() {
+ if (CleanDirectory(m_ZenFolderPath, {}))
+ {
+ std::error_code DummyEc;
+ RemoveDir(m_ZenFolderPath, DummyEc);
+ }
+ });
+
+ StorageInstance Storage = CreateBuildStorage(StorageStats,
+ StorageCacheStats,
+ ZenTempFolderPath(m_ZenFolderPath),
+ /*RequriesNamespace*/ true,
+ /*RequireBucket*/ true,
+ /*BoostCacheBackgroundWorkerPool */ true);
+
+ const Oid BuildId = ParseBuildId();
+
+ std::vector<Oid> BuildPartIds = ParseBuildPartIds();
+ std::vector<std::string> BuildPartNames = ParseBuildPartNames();
+
+ std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u;
+
+ std::vector<std::pair<Oid, std::string>> AllBuildParts =
+ ResolveBuildPartNames(*Storage.BuildStorage, BuildId, BuildPartIds, BuildPartNames, PreferredMultipartChunkSize);
+
+ std::vector<Oid> AllBuildPartIds;
+ AllBuildPartIds.reserve(AllBuildParts.size());
+ for (const std::pair<Oid, std::string>& BuildPart : AllBuildParts)
+ {
+ AllBuildPartIds.push_back(BuildPart.first);
+ }
+
+ ProgressBar::SetLogOperationName(ProgressMode, "Prime Cache");
+
+ ConsoleOpLogOutput Output(ProgressMode);
+
+ BuildsOperationPrimeCache PrimeOp(Output,
+ Storage,
+ AbortFlag,
+ PauseFlag,
+ GetNetworkPool(),
+ BuildId,
+ AllBuildPartIds,
+ BuildsOperationPrimeCache::Options{.IsQuiet = IsQuiet,
+ .IsVerbose = IsVerbose,
+ .ZenFolderPath = m_ZenFolderPath,
+ .LargeAttachmentSize = PreferredMultipartChunkSize * 4u,
+ .PreferredMultipartChunkSize = PreferredMultipartChunkSize,
+ .ForceUpload = m_Force},
+ StorageCacheStats);
+ PrimeOp.Execute();
+
+ if (!IsQuiet)
+ {
+ if (Storage.BuildCacheStorage)
+ {
+ ZEN_CONSOLE("Uploaded {} ({}) blobs",
+ StorageCacheStats.PutBlobCount.load(),
+ NiceBytes(StorageCacheStats.PutBlobByteCount),
+ Storage.CacheName);
+ }
+ }
+
+ return;
+ }
+
if (SubOption == &m_FetchBlobOptions)
{
if (!IsQuiet)
@@ -4167,7 +4293,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
StorageCacheStats,
ZenTempFolderPath(m_ZenFolderPath),
/*RequriesNamespace*/ true,
- /*RequireBucket*/ true);
+ /*RequireBucket*/ true,
+ /*BoostCacheBackgroundWorkerPool */ false);
IoHash BlobHash = ParseBlobHash();
@@ -4223,7 +4350,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
StorageCacheStats,
ZenTempFolderPath(m_ZenFolderPath),
/*RequriesNamespace*/ true,
- /*RequireBucket*/ true);
+ /*RequireBucket*/ true,
+ /*BoostCacheBackgroundWorkerPool */ false);
Oid BuildId = ParseBuildId();
@@ -4270,7 +4398,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
StorageCacheStats,
ZenTempFolderPath(m_ZenFolderPath),
/*RequriesNamespace*/ true,
- /*RequireBucket*/ true);
+ /*RequireBucket*/ true,
+ /*BoostCacheBackgroundWorkerPool */ false);
Stopwatch Timer;
for (const std::string& BuildIdString : m_BuildIds)
@@ -4404,7 +4533,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
StorageCacheStats,
ZenTempFolderPath(m_ZenFolderPath),
/*RequriesNamespace*/ true,
- /*RequireBucket*/ true);
+ /*RequireBucket*/ true,
+ /*BoostCacheBackgroundWorkerPool */ false);
m_BuildId = Oid::NewOid().ToString();
m_BuildPartName = m_Path.filename().string();
diff --git a/src/zen/cmds/builds_cmd.h b/src/zen/cmds/builds_cmd.h
index e909a6155..ef5ac1e10 100644
--- a/src/zen/cmds/builds_cmd.h
+++ b/src/zen/cmds/builds_cmd.h
@@ -16,7 +16,7 @@ class BuildsCommand : public CacheStoreCommand
public:
static constexpr char Name[] = "builds";
static constexpr char Description[] =
- "Manage builds - list, list-namespaces, ls, upload, download, diff, fetch-blob, validate-part, pause, resume, abort";
+ "Manage builds - list, list-namespaces, ls, upload, download, prime-cache, diff, fetch-blob, validate-part, pause, resume, abort";
BuildsCommand();
~BuildsCommand();
@@ -106,6 +106,8 @@ private:
cxxopts::Options m_FetchBlobOptions{"fetch-blob", "Fetch a blob from remote store"};
std::string m_BlobHash;
+ cxxopts::Options m_PrimeCacheOptions{"prime-cache", "Prime cache from a remote store"};
+
cxxopts::Options m_PauseOptions{"pause", "Pause an ongoing zen builds process"};
cxxopts::Options m_ResumeOptions{"resume", "Resume a paused zen builds process"};
cxxopts::Options m_AbortOptions{"abort", "Abort an ongoing zen builds process"};
@@ -119,7 +121,7 @@ private:
cxxopts::Options m_MultiTestDownloadOptions{"multi-test-download", "Test multiple sequenced downloads with verify"};
std::vector<std::string> m_BuildIds;
- cxxopts::Options* m_SubCommands[14] = {&m_ListNamespacesOptions,
+ cxxopts::Options* m_SubCommands[15] = {&m_ListNamespacesOptions,
&m_ListOptions,
&m_ListBlocksOptions,
&m_UploadOptions,
@@ -130,6 +132,7 @@ private:
&m_DiffOptions,
&m_LsOptions,
&m_FetchBlobOptions,
+ &m_PrimeCacheOptions,
&m_ValidateBuildPartOptions,
&m_TestOptions,
&m_MultiTestDownloadOptions};
diff --git a/src/zenremotestore/builds/buildstoragecache.cpp b/src/zenremotestore/builds/buildstoragecache.cpp
index d36d75480..2e9cf9954 100644
--- a/src/zenremotestore/builds/buildstoragecache.cpp
+++ b/src/zenremotestore/builds/buildstoragecache.cpp
@@ -104,6 +104,10 @@ public:
m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()),
Payload,
ContentType);
+
+ m_Stats.PutBlobCount++;
+ m_Stats.PutBlobByteCount += Payload.GetSize();
+
AddStatistic(CacheResponse);
if (!CacheResponse.IsSuccess())
{
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp
index c9f142d7a..f106b7b18 100644
--- a/src/zenremotestore/builds/buildstorageoperations.cpp
+++ b/src/zenremotestore/builds/buildstorageoperations.cpp
@@ -7099,6 +7099,280 @@ BuildsOperationValidateBuildPart::Execute()
}
}
+BuildsOperationPrimeCache::BuildsOperationPrimeCache(BuildOpLogOutput& LogOutput,
+ StorageInstance& Storage,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ WorkerThreadPool& NetworkPool,
+ const Oid& BuildId,
+ std::span<const Oid> BuildPartIds,
+ const Options& Options,
+ BuildStorageCache::Statistics& StorageCacheStats)
+: m_LogOutput(LogOutput)
+, m_Storage(Storage)
+, m_AbortFlag(AbortFlag)
+, m_PauseFlag(PauseFlag)
+, m_NetworkPool(NetworkPool)
+, m_BuildId(BuildId)
+, m_BuildPartIds(BuildPartIds.begin(), BuildPartIds.end())
+, m_Options(Options)
+, m_StorageCacheStats(StorageCacheStats)
+{
+ m_TempPath = m_Options.ZenFolderPath / "tmp";
+ CreateDirectories(m_TempPath);
+}
+
+void
+BuildsOperationPrimeCache::Execute()
+{
+ ZEN_TRACE_CPU("BuildsOperationPrimeCache::Execute");
+
+ Stopwatch PrimeTimer;
+
+ tsl::robin_map<IoHash, uint64_t, IoHash::Hasher> LooseChunkRawSizes;
+
+ tsl::robin_set<IoHash, IoHash::Hasher> BuildBlobs;
+
+ for (const Oid& BuildPartId : m_BuildPartIds)
+ {
+ CbObject BuildPart = m_Storage.BuildStorage->GetBuildPart(m_BuildId, BuildPartId);
+
+ CbObjectView BlockAttachmentsView = BuildPart["blockAttachments"sv].AsObjectView();
+ std::vector<IoHash> BlockAttachments = compactbinary_helpers::ReadBinaryAttachmentArray("rawHashes"sv, BlockAttachmentsView);
+
+ CbObjectView ChunkAttachmentsView = BuildPart["chunkAttachments"sv].AsObjectView();
+ std::vector<IoHash> ChunkAttachments = compactbinary_helpers::ReadBinaryAttachmentArray("rawHashes"sv, ChunkAttachmentsView);
+ std::vector<uint64_t> ChunkRawSizes = compactbinary_helpers::ReadArray<uint64_t>("chunkRawSizes"sv, ChunkAttachmentsView);
+ if (ChunkAttachments.size() != ChunkRawSizes.size())
+ {
+ throw std::runtime_error(fmt::format("Mismatch of loose chunk raw size array, expected {}, found {}",
+ ChunkAttachments.size(),
+ ChunkRawSizes.size()));
+ }
+
+ BuildBlobs.reserve(ChunkAttachments.size() + BlockAttachments.size());
+ BuildBlobs.insert(BlockAttachments.begin(), BlockAttachments.end());
+ BuildBlobs.insert(ChunkAttachments.begin(), ChunkAttachments.end());
+
+ for (size_t ChunkAttachmentIndex = 0; ChunkAttachmentIndex < ChunkAttachments.size(); ChunkAttachmentIndex++)
+ {
+ LooseChunkRawSizes.insert_or_assign(ChunkAttachments[ChunkAttachmentIndex], ChunkRawSizes[ChunkAttachmentIndex]);
+ }
+ }
+
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput, "Found {} referenced blobs", BuildBlobs.size());
+ }
+
+ if (BuildBlobs.empty())
+ {
+ return;
+ }
+
+ std::vector<IoHash> BlobsToDownload;
+ BlobsToDownload.reserve(BuildBlobs.size());
+
+ if (m_Storage.BuildCacheStorage && !BuildBlobs.empty() && !m_Options.ForceUpload)
+ {
+ ZEN_TRACE_CPU("BlobCacheExistCheck");
+ Stopwatch Timer;
+
+ const std::vector<IoHash> BlobHashes(BuildBlobs.begin(), BuildBlobs.end());
+ const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult =
+ m_Storage.BuildCacheStorage->BlobsExists(m_BuildId, BlobHashes);
+
+ if (CacheExistsResult.size() == BlobHashes.size())
+ {
+ for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++)
+ {
+ if (!CacheExistsResult[BlobIndex].HasBody)
+ {
+ BlobsToDownload.push_back(BlobHashes[BlobIndex]);
+ }
+ }
+ size_t FoundCount = BuildBlobs.size() - BlobsToDownload.size();
+
+ if (FoundCount > 0 && !m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Remote cache : Found {} out of {} needed blobs in {}",
+ FoundCount,
+ BuildBlobs.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ }
+ }
+ else
+ {
+ BlobsToDownload.insert(BlobsToDownload.end(), BuildBlobs.begin(), BuildBlobs.end());
+ }
+
+ if (BlobsToDownload.empty())
+ {
+ return;
+ }
+
+ std::atomic<uint64_t> MultipartAttachmentCount;
+ std::atomic<size_t> CompletedDownloadCount;
+ FilteredRate FilteredDownloadedBytesPerSecond;
+
+ {
+ std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Downloading"));
+ BuildOpLogOutput::ProgressBar& Progress(*ProgressBarPtr);
+
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ const size_t BlobCount = BlobsToDownload.size();
+
+ for (size_t BlobIndex = 0; BlobIndex < BlobCount; BlobIndex++)
+ {
+ Work.ScheduleWork(
+ m_NetworkPool,
+ [this,
+ &Work,
+ &BlobsToDownload,
+ BlobCount,
+ &LooseChunkRawSizes,
+ &CompletedDownloadCount,
+ &FilteredDownloadedBytesPerSecond,
+ &MultipartAttachmentCount,
+ BlobIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ const IoHash& BlobHash = BlobsToDownload[BlobIndex];
+
+ bool IsLargeBlob = false;
+
+ if (auto It = LooseChunkRawSizes.find(BlobHash); It != LooseChunkRawSizes.end())
+ {
+ IsLargeBlob = It->second >= m_Options.LargeAttachmentSize;
+ }
+
+ FilteredDownloadedBytesPerSecond.Start();
+
+ if (IsLargeBlob)
+ {
+ DownloadLargeBlob(*m_Storage.BuildStorage,
+ m_TempPath,
+ m_BuildId,
+ BlobHash,
+ m_Options.PreferredMultipartChunkSize,
+ Work,
+ m_NetworkPool,
+ m_DownloadStats.DownloadedChunkByteCount,
+ MultipartAttachmentCount,
+ [this, BlobCount, BlobHash, &FilteredDownloadedBytesPerSecond, &CompletedDownloadCount](
+ IoBuffer&& Payload) {
+ m_DownloadStats.DownloadedChunkCount++;
+ m_DownloadStats.RequestsCompleteCount++;
+
+ if (!m_AbortFlag)
+ {
+ if (Payload && m_Storage.BuildCacheStorage)
+ {
+ m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId,
+ BlobHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(Payload)));
+ }
+ }
+ CompletedDownloadCount++;
+ if (CompletedDownloadCount == BlobCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ });
+ }
+ else
+ {
+ IoBuffer Payload = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlobHash);
+ m_DownloadStats.DownloadedBlockCount++;
+ m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize();
+ m_DownloadStats.RequestsCompleteCount++;
+
+ if (!m_AbortFlag)
+ {
+ if (Payload && m_Storage.BuildCacheStorage)
+ {
+ m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId,
+ BlobHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(std::move(Payload))));
+ }
+ }
+ CompletedDownloadCount++;
+ if (CompletedDownloadCount == BlobCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ }
+ }
+ });
+ }
+
+ Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
+
+ uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + m_DownloadStats.DownloadedBlockByteCount.load();
+ FilteredDownloadedBytesPerSecond.Update(DownloadedBytes);
+
+ std::string DownloadRateString = (CompletedDownloadCount == BlobCount)
+ ? ""
+ : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8));
+ std::string UploadDetails = m_Storage.BuildCacheStorage ? fmt::format(" {} ({}) uploaded.",
+ m_StorageCacheStats.PutBlobCount.load(),
+ NiceBytes(m_StorageCacheStats.PutBlobByteCount.load()))
+ : "";
+
+ std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}",
+ CompletedDownloadCount.load(),
+ BlobCount,
+ NiceBytes(DownloadedBytes),
+ DownloadRateString,
+ UploadDetails);
+ Progress.UpdateState({.Task = "Downloading",
+ .Details = Details,
+ .TotalCount = BlobCount,
+ .RemainingCount = BlobCount - CompletedDownloadCount.load(),
+ .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+
+ FilteredDownloadedBytesPerSecond.Stop();
+
+ Progress.Finish();
+ }
+ if (m_AbortFlag)
+ {
+ return;
+ }
+
+ if (m_Storage.BuildCacheStorage)
+ {
+ m_Storage.BuildCacheStorage->Flush(m_LogOutput.GetProgressUpdateDelayMS(), [this](intptr_t Remaining) -> bool {
+ ZEN_UNUSED(Remaining);
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput, "Waiting for {} blobs to finish upload to '{}'", Remaining, m_Storage.CacheName);
+ }
+ return !m_AbortFlag;
+ });
+ }
+
+ if (!m_Options.IsQuiet)
+ {
+ uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + m_DownloadStats.DownloadedBlockByteCount.load();
+ LOG_OUTPUT(m_LogOutput,
+ "Downloaded {} ({}bits/s) in {}. {} as multipart. Completed in {}",
+ NiceBytes(DownloadedBytes),
+ NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), DownloadedBytes * 8)),
+ NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000),
+ MultipartAttachmentCount.load(),
+ NiceTimeSpanMs(PrimeTimer.GetElapsedTimeMs()));
+ }
+}
+
CompositeBuffer
ValidateBlob(std::atomic<bool>& AbortFlag,
BuildStorage& Storage,
diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h b/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h
index e30270848..f1916da10 100644
--- a/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h
+++ b/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h
@@ -26,6 +26,8 @@ public:
std::atomic<uint64_t> PeakSentBytes = 0;
std::atomic<uint64_t> PeakReceivedBytes = 0;
std::atomic<uint64_t> PeakBytesPerSec = 0;
+ std::atomic<uint64_t> PutBlobCount = 0;
+ std::atomic<uint64_t> PutBlobByteCount = 0;
};
virtual ~BuildStorageCache() {}
diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
index 8b5b63ef9..157435971 100644
--- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
+++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
@@ -6,6 +6,7 @@
#include <zencore/logging.h>
#include <zencore/uid.h>
#include <zencore/zencore.h>
+#include <zenremotestore/builds/buildstoragecache.h>
#include <zenremotestore/chunking/chunkedcontent.h>
#include <zenutil/bufferedwritefilecache.h>
@@ -755,6 +756,47 @@ private:
const Options m_Options;
};
+class BuildsOperationPrimeCache
+{
+public:
+ struct Options
+ {
+ bool IsQuiet = false;
+ bool IsVerbose = false;
+ std::filesystem::path ZenFolderPath;
+ std::uint64_t LargeAttachmentSize = 32u * 1024u * 1024u * 4u;
+ std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u;
+ bool ForceUpload = false;
+ };
+
+ BuildsOperationPrimeCache(BuildOpLogOutput& LogOutput,
+ StorageInstance& Storage,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ WorkerThreadPool& NetworkPool,
+ const Oid& BuildId,
+ std::span<const Oid> BuildPartIds,
+ const Options& Options,
+ BuildStorageCache::Statistics& StorageCacheStats);
+
+ void Execute();
+
+ DownloadStatistics m_DownloadStats;
+
+private:
+ BuildOpLogOutput& m_LogOutput;
+ StorageInstance& m_Storage;
+ std::atomic<bool>& m_AbortFlag;
+ std::atomic<bool>& m_PauseFlag;
+ WorkerThreadPool& m_NetworkPool;
+ const Oid m_BuildId;
+ std::vector<Oid> m_BuildPartIds;
+ Options m_Options;
+ std::filesystem::path m_TempPath;
+
+ BuildStorageCache::Statistics& m_StorageCacheStats;
+};
+
CompositeBuffer ValidateBlob(std::atomic<bool>& AbortFlag,
BuildStorage& Storage,
const Oid& BuildId,