aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-03-27 16:08:47 +0100
committerGitHub Enterprise <[email protected]>2025-03-27 16:08:47 +0100
commit013ac818cd09c1d31bf9411e00b2bbbf02defa3f (patch)
treecdc94b8fe80f0c5db20f0417d76e5351fe480f4a
parentMerge pull request #317 from ue-foundation/zs/ui-show-cook-artifacts (diff)
downloadzen-013ac818cd09c1d31bf9411e00b2bbbf02defa3f.tar.xz
zen-013ac818cd09c1d31bf9411e00b2bbbf02defa3f.zip
build cache prime (#327)
- Feature: zen `--boost-workers` option to builds `upload`, `download` and `validate-part` that will increase the number of worker threads, may cause computer to be less responsive - Feature: zen `--cache-prime-only` that uploads referenced data from a part to `--zen-cache-host` if it is not already present. Target folder will be untouched.
-rw-r--r--CHANGELOG.md2
-rw-r--r--src/zen/cmds/builds_cmd.cpp1087
-rw-r--r--src/zen/cmds/builds_cmd.h6
-rw-r--r--src/zenutil/buildstoragecache.cpp59
-rw-r--r--src/zenutil/include/zenutil/buildstoragecache.h7
-rw-r--r--src/zenutil/include/zenutil/workerpools.h3
-rw-r--r--src/zenutil/workerpools.cpp14
7 files changed, 724 insertions, 454 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 7c6dccb44..155654ca1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -13,6 +13,8 @@
- `/builds/{namespace}/{bucket}/{buildid}/blobs/exists` `POST` method for checking existance of blobs
- Feature: zen: `--zen-cache-host` option for `upload` and `download` operations to use a zenserver host `/builds` endpoint for storing build blob and blob metadata
- Feature: zenserver: Add command line option `--gc-buildstore-duration-seconds` to control GC life time of build store data
+ - Feature: zen `--boost-workers` option to builds `upload`, `download` and `validate-part` that will increase the number of worker threads, may cause computer to be less responsive
+ - Feature: zen `--cache-prime-only` that uploads referenced data from a part to `--zen-cache-host` if it is not already present. Target folder will be untouched.
- Improvement: Do partial requests of blocks if not all of the block is needed
- Improvement: Better progress/statistics on upload and download
- Improvement: Scavenge .zen temp folders for existing data (downloaded, decompressed or written) from previous failed run
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index 08d30948b..3a54de935 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -91,7 +91,16 @@ namespace {
const double DefaultLatency = 0; // .0010;
const double DefaultDelayPerKBSec = 0; // 0.00005;
- const bool SingleThreaded = false;
+ const bool SingleThreaded = false;
+ bool BoostWorkerThreads = false;
+
+ WorkerThreadPool& GetIOWorkerPool()
+ {
+ return SingleThreaded ? GetSyncWorkerPool()
+ : BoostWorkerThreads ? GetLargeWorkerPool(EWorkloadType::Burst)
+ : GetMediumWorkerPool(EWorkloadType::Burst);
+ }
+ WorkerThreadPool& GetNetworkPool() { return SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst); }
const uint64_t MinimumSizeForCompressInBlock = 2u * 1024u;
@@ -438,7 +447,7 @@ namespace {
Path,
std::move(IsAcceptedFolder),
std::move(IsAcceptedFile),
- SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst),
+ GetIOWorkerPool(),
UsePlainProgress ? 5000 : 200,
[](bool, std::ptrdiff_t) {},
AbortFlag);
@@ -452,7 +461,7 @@ namespace {
FilteredBytesHashed.Start();
ChunkedFolderContent FolderContent = ChunkFolderContent(
ChunkingStats,
- SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst),
+ GetIOWorkerPool(),
Path,
Content,
ChunkController,
@@ -1563,8 +1572,8 @@ namespace {
BlockAttachments.size() - VerifyBlockDescriptions.size()));
}
- WorkerThreadPool& NetworkPool = SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst);
- WorkerThreadPool& VerifyPool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst);
+ WorkerThreadPool& NetworkPool = GetNetworkPool();
+ WorkerThreadPool& VerifyPool = GetIOWorkerPool();
ParallellWork Work(AbortFlag);
const std::filesystem::path TempFolder = ".zen-tmp";
@@ -1946,8 +1955,8 @@ namespace {
RwLock Lock;
- WorkerThreadPool& GenerateBlobsPool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst);
- WorkerThreadPool& UploadBlocksPool = SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst);
+ WorkerThreadPool& GenerateBlobsPool = GetIOWorkerPool();
+ WorkerThreadPool& UploadBlocksPool = GetNetworkPool();
FilteredRate FilteredGeneratedBytesPerSecond;
FilteredRate FilteredUploadedBytesPerSecond;
@@ -2147,8 +2156,8 @@ namespace {
{
ProgressBar ProgressBar(UsePlainProgress);
- WorkerThreadPool& ReadChunkPool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst);
- WorkerThreadPool& UploadChunkPool = SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst);
+ WorkerThreadPool& ReadChunkPool = GetIOWorkerPool();
+ WorkerThreadPool& UploadChunkPool = GetNetworkPool();
FilteredRate FilteredGenerateBlockBytesPerSecond;
FilteredRate FilteredCompressedBytesPerSecond;
@@ -2363,7 +2372,7 @@ namespace {
if (!AbortFlag)
{
Work.ScheduleWork(
- SingleThreaded ? GetSyncWorkerPool() : ReadChunkPool,
+ ReadChunkPool,
[&, BlockIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -2420,7 +2429,7 @@ namespace {
{
const uint32_t ChunkIndex = LooseChunkIndexes[LooseChunkOrderIndex];
Work.ScheduleWork(
- SingleThreaded ? GetSyncWorkerPool() : ReadChunkPool,
+ ReadChunkPool,
[&, ChunkIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -2697,54 +2706,52 @@ namespace {
FindBlocksStatistics FindBlocksStats;
- std::future<PrepareBuildResult> PrepBuildResultFuture =
- GetSmallWorkerPool(EWorkloadType::Burst)
- .EnqueueTask(std::packaged_task<PrepareBuildResult()>{
- [&Storage, BuildId, &MetaData, CreateBuild, AllowMultiparts, IgnoreExistingBlocks, &FindBlocksStats] {
- ZEN_TRACE_CPU("PrepareBuild");
+ std::future<PrepareBuildResult> PrepBuildResultFuture = GetNetworkPool().EnqueueTask(std::packaged_task<PrepareBuildResult()>{
+ [&Storage, BuildId, &MetaData, CreateBuild, AllowMultiparts, IgnoreExistingBlocks, &FindBlocksStats] {
+ ZEN_TRACE_CPU("PrepareBuild");
- PrepareBuildResult Result;
- Stopwatch Timer;
- if (CreateBuild)
- {
- ZEN_TRACE_CPU("CreateBuild");
+ PrepareBuildResult Result;
+ Stopwatch Timer;
+ if (CreateBuild)
+ {
+ ZEN_TRACE_CPU("CreateBuild");
- Stopwatch PutBuildTimer;
- CbObject PutBuildResult = Storage.BuildStorage->PutBuild(BuildId, MetaData);
- Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs();
- Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize);
- Result.PayloadSize = MetaData.GetSize();
- }
- else
- {
- ZEN_TRACE_CPU("PutBuild");
- Stopwatch GetBuildTimer;
- CbObject Build = Storage.BuildStorage->GetBuild(BuildId);
- Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs();
- Result.PayloadSize = Build.GetSize();
- if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0)
- {
- Result.PreferredMultipartChunkSize = ChunkSize;
- }
- else if (AllowMultiparts)
- {
- ZEN_WARN("PreferredMultipartChunkSize is unknown. Defaulting to '{}'",
- NiceBytes(Result.PreferredMultipartChunkSize));
- }
- }
+ Stopwatch PutBuildTimer;
+ CbObject PutBuildResult = Storage.BuildStorage->PutBuild(BuildId, MetaData);
+ Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs();
+ Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize);
+ Result.PayloadSize = MetaData.GetSize();
+ }
+ else
+ {
+ ZEN_TRACE_CPU("PutBuild");
+ Stopwatch GetBuildTimer;
+ CbObject Build = Storage.BuildStorage->GetBuild(BuildId);
+ Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs();
+ Result.PayloadSize = Build.GetSize();
+ if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0)
+ {
+ Result.PreferredMultipartChunkSize = ChunkSize;
+ }
+ else if (AllowMultiparts)
+ {
+ ZEN_WARN("PreferredMultipartChunkSize is unknown. Defaulting to '{}'",
+ NiceBytes(Result.PreferredMultipartChunkSize));
+ }
+ }
- if (!IgnoreExistingBlocks)
- {
- ZEN_TRACE_CPU("FindBlocks");
- Stopwatch KnownBlocksTimer;
- Result.KnownBlocks = ParseChunkBlockDescriptionList(Storage.BuildStorage->FindBlocks(BuildId));
- FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs();
- FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size();
- Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs();
- }
- Result.ElapsedTimeMs = Timer.GetElapsedTimeMs();
- return Result;
- }});
+ if (!IgnoreExistingBlocks)
+ {
+ ZEN_TRACE_CPU("FindBlocks");
+ Stopwatch KnownBlocksTimer;
+ Result.KnownBlocks = ParseChunkBlockDescriptionList(Storage.BuildStorage->FindBlocks(BuildId));
+ FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs();
+ FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size();
+ Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs();
+ }
+ Result.ElapsedTimeMs = Timer.GetElapsedTimeMs();
+ return Result;
+ }});
ChunkedFolderContent LocalContent;
@@ -2852,7 +2859,7 @@ namespace {
}
return true;
},
- SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst),
+ GetIOWorkerPool(),
UsePlainProgress ? 5000 : 200,
[&](bool, std::ptrdiff_t) {
ZEN_CONSOLE_VERBOSE("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path);
@@ -2911,7 +2918,7 @@ namespace {
FilteredBytesHashed.Start();
LocalContent = ChunkFolderContent(
ChunkingStats,
- SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst),
+ GetIOWorkerPool(),
Path,
Content,
*ChunkController,
@@ -3663,7 +3670,7 @@ namespace {
ProgressBar ProgressBar(UsePlainProgress);
- WorkerThreadPool& VerifyPool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst);
+ WorkerThreadPool& VerifyPool = GetIOWorkerPool();
ParallellWork Work(AbortFlag);
@@ -4712,6 +4719,7 @@ namespace {
const std::vector<IoHash>& LooseChunkHashes,
bool AllowPartialBlockRequests,
bool WipeTargetFolder,
+ bool PrimeCacheOnly,
FolderContent& OutLocalFolderState,
DiskStatistics& DiskStats,
CacheMappingStatistics& CacheMappingStats,
@@ -4721,7 +4729,7 @@ namespace {
{
ZEN_TRACE_CPU("UpdateFolder");
- ZEN_UNUSED(WipeTargetFolder);
+ ZEN_ASSERT((!PrimeCacheOnly) || (PrimeCacheOnly && (!AllowPartialBlockRequests)));
Stopwatch IndexTimer;
@@ -4741,6 +4749,7 @@ namespace {
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedChunkHashesFound;
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedSequenceHashesFound;
+ if (!PrimeCacheOnly)
{
ZEN_TRACE_CPU("UpdateFolder_CheckChunkCache");
@@ -4786,6 +4795,7 @@ namespace {
}
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedBlocksFound;
+ if (!PrimeCacheOnly)
{
ZEN_TRACE_CPU("UpdateFolder_CheckBlockCache");
@@ -4831,8 +4841,10 @@ namespace {
}
std::vector<uint32_t> LocalPathIndexesMatchingSequenceIndexes;
- // Pick up all whole files we can use from current local state
+
+ if (!PrimeCacheOnly)
{
+ // Pick up all whole files we can use from current local state
ZEN_TRACE_CPU("UpdateFolder_CheckLocalChunks");
for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < RemoteContent.ChunkedContent.SequenceRawHashes.size();
RemoteSequenceIndex++)
@@ -4870,6 +4882,15 @@ namespace {
}
}
}
+ else
+ {
+ for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < RemoteContent.ChunkedContent.SequenceRawHashes.size();
+ RemoteSequenceIndex++)
+ {
+ const uint32_t ChunkCount = RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex];
+ SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount;
+ }
+ }
// Pick up all chunks in current local state
struct CacheCopyData
{
@@ -4887,6 +4908,7 @@ namespace {
tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCacheCopyDataIndex;
std::vector<CacheCopyData> CacheCopyDatas;
+ if (!PrimeCacheOnly)
{
ZEN_TRACE_CPU("UpdateFolder_GetLocalChunks");
@@ -5004,8 +5026,8 @@ namespace {
FilteredRate FilteredDownloadedBytesPerSecond;
FilteredRate FilteredWrittenBytesPerSecond;
- WorkerThreadPool& NetworkPool = SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst);
- WorkerThreadPool& WritePool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst);
+ WorkerThreadPool& NetworkPool = GetNetworkPool();
+ WorkerThreadPool& WritePool = GetIOWorkerPool();
ProgressBar WriteProgressBar(UsePlainProgress);
ParallellWork Work(AbortFlag);
@@ -5097,191 +5119,205 @@ namespace {
const std::vector<uint32_t> BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription);
if (!BlockChunkIndexNeeded.empty())
{
- bool UsingCachedBlock = false;
- if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end())
+ if (PrimeCacheOnly)
{
- ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_CacheGet");
-
+ TotalRequestCount++;
TotalPartWriteCount++;
- std::filesystem::path BlockPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString();
- if (std::filesystem::exists(BlockPath))
+ FullBlockWorks.push_back(BlockIndex);
+ }
+ else
+ {
+ bool UsingCachedBlock = false;
+ if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end())
{
- CachedChunkBlockIndexes.push_back(BlockIndex);
- UsingCachedBlock = true;
+ ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_CacheGet");
+
+ TotalPartWriteCount++;
+
+ std::filesystem::path BlockPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString();
+ if (std::filesystem::exists(BlockPath))
+ {
+ CachedChunkBlockIndexes.push_back(BlockIndex);
+ UsingCachedBlock = true;
+ }
}
- }
- if (!UsingCachedBlock)
- {
- bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size();
- bool CanDoPartialBlockDownload =
- (BlockDescription.HeaderSize > 0) &&
- (BlockDescription.ChunkCompressedLengths.size() == BlockDescription.ChunkRawHashes.size());
- if (AllowPartialBlockRequests && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload)
+ if (!UsingCachedBlock)
{
- std::vector<BlockRangeDescriptor> BlockRanges;
+ bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size();
+ bool CanDoPartialBlockDownload =
+ (BlockDescription.HeaderSize > 0) &&
+ (BlockDescription.ChunkCompressedLengths.size() == BlockDescription.ChunkRawHashes.size());
+ if (AllowPartialBlockRequests && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload)
+ {
+ std::vector<BlockRangeDescriptor> BlockRanges;
- ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialAnalysis");
+ ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialAnalysis");
- uint32_t NeedBlockChunkIndexOffset = 0;
- uint32_t ChunkBlockIndex = 0;
- uint32_t CurrentOffset =
- gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
+ uint32_t NeedBlockChunkIndexOffset = 0;
+ uint32_t ChunkBlockIndex = 0;
+ uint32_t CurrentOffset =
+ gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
- const uint64_t TotalBlockSize = std::accumulate(BlockDescription.ChunkCompressedLengths.begin(),
- BlockDescription.ChunkCompressedLengths.end(),
- std::uint64_t(CurrentOffset));
+ const uint64_t TotalBlockSize = std::accumulate(BlockDescription.ChunkCompressedLengths.begin(),
+ BlockDescription.ChunkCompressedLengths.end(),
+ std::uint64_t(CurrentOffset));
- BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex};
- while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() &&
- ChunkBlockIndex < BlockDescription.ChunkRawHashes.size())
- {
- const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex];
- if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
+ BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex};
+ while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() &&
+ ChunkBlockIndex < BlockDescription.ChunkRawHashes.size())
{
- if (NextRange.RangeLength > 0)
+ const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex];
+ if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
+ {
+ if (NextRange.RangeLength > 0)
+ {
+ BlockRanges.push_back(NextRange);
+ NextRange = {.BlockIndex = BlockIndex};
+ }
+ ChunkBlockIndex++;
+ CurrentOffset += ChunkCompressedLength;
+ }
+ else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
{
- BlockRanges.push_back(NextRange);
- NextRange = {.BlockIndex = BlockIndex};
+ if (NextRange.RangeLength == 0)
+ {
+ NextRange.RangeStart = CurrentOffset;
+ NextRange.ChunkBlockIndexStart = ChunkBlockIndex;
+ }
+ NextRange.RangeLength += ChunkCompressedLength;
+ NextRange.ChunkBlockIndexCount++;
+ ChunkBlockIndex++;
+ CurrentOffset += ChunkCompressedLength;
+ NeedBlockChunkIndexOffset++;
}
- ChunkBlockIndex++;
- CurrentOffset += ChunkCompressedLength;
- }
- else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
- {
- if (NextRange.RangeLength == 0)
+ else
{
- NextRange.RangeStart = CurrentOffset;
- NextRange.ChunkBlockIndexStart = ChunkBlockIndex;
+ ZEN_ASSERT(false);
}
- NextRange.RangeLength += ChunkCompressedLength;
- NextRange.ChunkBlockIndexCount++;
- ChunkBlockIndex++;
- CurrentOffset += ChunkCompressedLength;
- NeedBlockChunkIndexOffset++;
}
- else
+ if (NextRange.RangeLength > 0)
{
- ZEN_ASSERT(false);
+ BlockRanges.push_back(NextRange);
}
- }
- if (NextRange.RangeLength > 0)
- {
- BlockRanges.push_back(NextRange);
- }
- ZEN_ASSERT(!BlockRanges.empty());
+ ZEN_ASSERT(!BlockRanges.empty());
- std::vector<BlockRangeDescriptor> CollapsedBlockRanges;
- auto It = BlockRanges.begin();
- CollapsedBlockRanges.push_back(*It++);
- while (It != BlockRanges.end())
- {
- BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back();
- uint64_t Slack = It->RangeStart - (LastRange.RangeStart + LastRange.RangeLength);
- uint64_t BothRangeSize = It->RangeLength + LastRange.RangeLength;
- if (Slack <= Max(BothRangeSize / 8, 64u * 1024u)) // Made up heuristic - we'll see how it pans out
- {
- LastRange.ChunkBlockIndexCount =
- (It->ChunkBlockIndexStart + It->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart;
- LastRange.RangeLength = (It->RangeStart + It->RangeLength) - LastRange.RangeStart;
- }
- else
+ std::vector<BlockRangeDescriptor> CollapsedBlockRanges;
+ auto It = BlockRanges.begin();
+ CollapsedBlockRanges.push_back(*It++);
+ while (It != BlockRanges.end())
{
- CollapsedBlockRanges.push_back(*It);
+ BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back();
+ uint64_t Slack = It->RangeStart - (LastRange.RangeStart + LastRange.RangeLength);
+ uint64_t BothRangeSize = It->RangeLength + LastRange.RangeLength;
+ if (Slack <= Max(BothRangeSize / 8, 64u * 1024u)) // Made up heuristic - we'll see how it pans out
+ {
+ LastRange.ChunkBlockIndexCount =
+ (It->ChunkBlockIndexStart + It->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart;
+ LastRange.RangeLength = (It->RangeStart + It->RangeLength) - LastRange.RangeStart;
+ }
+ else
+ {
+ CollapsedBlockRanges.push_back(*It);
+ }
+ ++It;
}
- ++It;
- }
-
- const std::uint64_t WantedSize = std::accumulate(
- CollapsedBlockRanges.begin(),
- CollapsedBlockRanges.end(),
- uint64_t(0),
- [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; });
- ZEN_ASSERT(WantedSize <= TotalBlockSize);
- if (WantedSize > ((TotalBlockSize * 95) / 100))
- {
- ZEN_CONSOLE_VERBOSE("Using more than 95% ({}) of block {} ({}), requesting full block",
- NiceBytes(WantedSize),
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize));
- TotalRequestCount++;
- TotalPartWriteCount++;
-
- FullBlockWorks.push_back(BlockIndex);
- }
- else if ((WantedSize > ((TotalBlockSize * 9) / 10)) && CollapsedBlockRanges.size() > 1)
- {
- ZEN_CONSOLE_VERBOSE("Using more than 90% ({}) of block {} ({}) using {} requests, requesting full block",
- NiceBytes(WantedSize),
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize),
- CollapsedBlockRanges.size());
- TotalRequestCount++;
- TotalPartWriteCount++;
- FullBlockWorks.push_back(BlockIndex);
- }
- else if ((WantedSize > ((TotalBlockSize * 8) / 10)) && (CollapsedBlockRanges.size() > 16))
- {
- ZEN_CONSOLE_VERBOSE("Using more than 80% ({}) of block {} ({}) using {} requests, requesting full block",
- NiceBytes(WantedSize),
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize),
- CollapsedBlockRanges.size());
- TotalRequestCount++;
- TotalPartWriteCount++;
+ const std::uint64_t WantedSize = std::accumulate(
+ CollapsedBlockRanges.begin(),
+ CollapsedBlockRanges.end(),
+ uint64_t(0),
+ [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; });
+ ZEN_ASSERT(WantedSize <= TotalBlockSize);
+ if (WantedSize > ((TotalBlockSize * 95) / 100))
+ {
+ ZEN_CONSOLE_VERBOSE("Using more than 95% ({}) of block {} ({}), requesting full block",
+ NiceBytes(WantedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize));
+ TotalRequestCount++;
+ TotalPartWriteCount++;
- FullBlockWorks.push_back(BlockIndex);
- }
- else if ((WantedSize > ((TotalBlockSize * 7) / 10)) && (CollapsedBlockRanges.size() > 48))
- {
- ZEN_CONSOLE_VERBOSE("Using more than 70% ({}) of block {} ({}) using {} requests, requesting full block",
- NiceBytes(WantedSize),
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize),
- CollapsedBlockRanges.size());
- TotalRequestCount++;
- TotalPartWriteCount++;
+ FullBlockWorks.push_back(BlockIndex);
+ }
+ else if ((WantedSize > ((TotalBlockSize * 9) / 10)) && CollapsedBlockRanges.size() > 1)
+ {
+ ZEN_CONSOLE_VERBOSE(
+ "Using more than 90% ({}) of block {} ({}) using {} requests, requesting full block",
+ NiceBytes(WantedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ CollapsedBlockRanges.size());
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+
+ FullBlockWorks.push_back(BlockIndex);
+ }
+ else if ((WantedSize > ((TotalBlockSize * 8) / 10)) && (CollapsedBlockRanges.size() > 16))
+ {
+ ZEN_CONSOLE_VERBOSE(
+ "Using more than 80% ({}) of block {} ({}) using {} requests, requesting full block",
+ NiceBytes(WantedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ CollapsedBlockRanges.size());
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+
+ FullBlockWorks.push_back(BlockIndex);
+ }
+ else if ((WantedSize > ((TotalBlockSize * 7) / 10)) && (CollapsedBlockRanges.size() > 48))
+ {
+ ZEN_CONSOLE_VERBOSE(
+ "Using more than 70% ({}) of block {} ({}) using {} requests, requesting full block",
+ NiceBytes(WantedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ CollapsedBlockRanges.size());
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+
+ FullBlockWorks.push_back(BlockIndex);
+ }
+ else if ((WantedSize > ((TotalBlockSize * 6) / 10)) && (CollapsedBlockRanges.size() > 64))
+ {
+ ZEN_CONSOLE_VERBOSE(
+ "Using more than 60% ({}) of block {} ({}) using {} requests, requesting full block",
+ NiceBytes(WantedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ CollapsedBlockRanges.size());
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+
+ FullBlockWorks.push_back(BlockIndex);
+ }
+ else
+ {
+ if (WantedSize > ((TotalBlockSize * 5) / 10))
+ {
+ ZEN_CONSOLE_VERBOSE("Using {}% ({}) of block {} ({}) using {} requests, requesting partial block",
+ (WantedSize * 100) / TotalBlockSize,
+ NiceBytes(WantedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ CollapsedBlockRanges.size());
+ }
+ TotalRequestCount += CollapsedBlockRanges.size();
+ TotalPartWriteCount += CollapsedBlockRanges.size();
- FullBlockWorks.push_back(BlockIndex);
+ BlockRangeWorks.insert(BlockRangeWorks.end(), CollapsedBlockRanges.begin(), CollapsedBlockRanges.end());
+ }
}
- else if ((WantedSize > ((TotalBlockSize * 6) / 10)) && (CollapsedBlockRanges.size() > 64))
+ else
{
- ZEN_CONSOLE_VERBOSE("Using more than 60% ({}) of block {} ({}) using {} requests, requesting full block",
- NiceBytes(WantedSize),
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize),
- CollapsedBlockRanges.size());
TotalRequestCount++;
TotalPartWriteCount++;
FullBlockWorks.push_back(BlockIndex);
}
- else
- {
- if (WantedSize > ((TotalBlockSize * 5) / 10))
- {
- ZEN_CONSOLE_VERBOSE("Using {}% ({}) of block {} ({}) using {} requests, requesting partial block",
- (WantedSize * 100) / TotalBlockSize,
- NiceBytes(WantedSize),
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize),
- CollapsedBlockRanges.size());
- }
- TotalRequestCount += CollapsedBlockRanges.size();
- TotalPartWriteCount += CollapsedBlockRanges.size();
-
- BlockRangeWorks.insert(BlockRangeWorks.end(), CollapsedBlockRanges.begin(), CollapsedBlockRanges.end());
- }
- }
- else
- {
- TotalRequestCount++;
- TotalPartWriteCount++;
-
- FullBlockWorks.push_back(BlockIndex);
}
}
}
@@ -5363,6 +5399,12 @@ namespace {
std::move(LooseChunkHashWork.ChunkTargetPtrs);
const uint32_t RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex;
+ if (PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]))
+ {
+ DownloadStats.RequestsCompleteCount++;
+ continue;
+ }
+
Work.ScheduleWork(
WritePool,
[&, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) mutable {
@@ -5370,6 +5412,7 @@ namespace {
{
ZEN_TRACE_CPU("UpdateFolder_ReadPreDownloaded");
std::filesystem::path ExistingCompressedChunkPath;
+ if (!PrimeCacheOnly)
{
const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
std::filesystem::path CompressedChunkPath = Path / ZenTempDownloadFolderName / ChunkHash.ToHexString();
@@ -5481,6 +5524,7 @@ namespace {
NetworkPool,
[&Path,
&Storage,
+ PrimeCacheOnly,
BuildId,
&RemoteContent,
&RemoteLookup,
@@ -5503,54 +5547,67 @@ namespace {
&DownloadStats](std::atomic<bool>&) mutable {
if (!AbortFlag)
{
- FilteredDownloadedBytesPerSecond.Start();
const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
- if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize)
+ FilteredDownloadedBytesPerSecond.Start();
+ IoBuffer BuildBlob;
+ const bool ExistsInCache =
+ Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash);
+ if (ExistsInCache)
{
- ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk");
- DownloadLargeBlob(*Storage.BuildStorage,
- Path / ZenTempDownloadFolderName,
- BuildId,
- ChunkHash,
- PreferredMultipartChunkSize,
- Work,
- NetworkPool,
- DownloadStats,
- [&, RemoteChunkIndex, ChunkTargetPtrs](IoBuffer&& Payload) mutable {
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- if (!AbortFlag)
- {
- AsyncWriteDownloadedChunk(
- Path,
- RemoteContent,
- RemoteLookup,
- RemoteChunkIndex,
- std::move(ChunkTargetPtrs),
- Work,
- WritePool,
- std::move(Payload),
- SequenceIndexChunksLeftToWriteCounters,
- WritePartsComplete,
- TotalPartWriteCount,
- FilteredWrittenBytesPerSecond,
- DiskStats);
- }
- });
+ BuildBlob = Storage.BuildCacheStorage->GetBuildBlob(BuildId, ChunkHash);
}
- else
+ if (!BuildBlob)
{
- ZEN_TRACE_CPU("UpdateFolder_GetChunk");
- IoBuffer BuildBlob;
- if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash))
+ if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize)
{
- BuildBlob = Storage.BuildCacheStorage->GetBuildBlob(BuildId, ChunkHash);
+ ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk");
+ DownloadLargeBlob(
+ *Storage.BuildStorage,
+ Path / ZenTempDownloadFolderName,
+ BuildId,
+ ChunkHash,
+ PreferredMultipartChunkSize,
+ Work,
+ NetworkPool,
+ DownloadStats,
+ [&, RemoteChunkIndex, ChunkTargetPtrs](IoBuffer&& Payload) mutable {
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ if (Payload && Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBuildBlob(
+ BuildId,
+ ChunkHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(Payload)));
+ }
+ if (!PrimeCacheOnly)
+ {
+ if (!AbortFlag)
+ {
+ AsyncWriteDownloadedChunk(Path,
+ RemoteContent,
+ RemoteLookup,
+ RemoteChunkIndex,
+ std::move(ChunkTargetPtrs),
+ Work,
+ WritePool,
+ std::move(Payload),
+ SequenceIndexChunksLeftToWriteCounters,
+ WritePartsComplete,
+ TotalPartWriteCount,
+ FilteredWrittenBytesPerSecond,
+ DiskStats);
+ }
+ }
+ });
}
- if (!BuildBlob)
+ else
{
+ ZEN_TRACE_CPU("UpdateFolder_GetChunk");
BuildBlob = Storage.BuildStorage->GetBuildBlob(BuildId, ChunkHash);
if (BuildBlob && Storage.BuildCacheStorage)
{
@@ -5560,34 +5617,37 @@ namespace {
BuildBlob.GetContentType(),
CompositeBuffer(SharedBuffer(BuildBlob)));
}
- }
- if (!BuildBlob)
- {
- throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
- }
- if (!AbortFlag)
- {
- uint64_t BlobSize = BuildBlob.GetSize();
- DownloadStats.DownloadedChunkCount++;
- DownloadStats.DownloadedChunkByteCount += BlobSize;
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ if (!BuildBlob)
{
- FilteredDownloadedBytesPerSecond.Stop();
+ throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
+ }
+ if (!PrimeCacheOnly)
+ {
+ if (!AbortFlag)
+ {
+ uint64_t BlobSize = BuildBlob.GetSize();
+ DownloadStats.DownloadedChunkCount++;
+ DownloadStats.DownloadedChunkByteCount += BlobSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ AsyncWriteDownloadedChunk(Path,
+ RemoteContent,
+ RemoteLookup,
+ RemoteChunkIndex,
+ std::move(ChunkTargetPtrs),
+ Work,
+ WritePool,
+ std::move(BuildBlob),
+ SequenceIndexChunksLeftToWriteCounters,
+ WritePartsComplete,
+ TotalPartWriteCount,
+ FilteredWrittenBytesPerSecond,
+ DiskStats);
+ }
}
- AsyncWriteDownloadedChunk(Path,
- RemoteContent,
- RemoteLookup,
- RemoteChunkIndex,
- std::move(ChunkTargetPtrs),
- Work,
- WritePool,
- std::move(BuildBlob),
- SequenceIndexChunksLeftToWriteCounters,
- WritePartsComplete,
- TotalPartWriteCount,
- FilteredWrittenBytesPerSecond,
- DiskStats);
}
}
}
@@ -5602,6 +5662,7 @@ namespace {
for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++)
{
+ ZEN_ASSERT(!PrimeCacheOnly);
if (AbortFlag)
{
break;
@@ -5775,6 +5836,7 @@ namespace {
for (uint32_t BlockIndex : CachedChunkBlockIndexes)
{
+ ZEN_ASSERT(!PrimeCacheOnly);
if (AbortFlag)
{
break;
@@ -5829,6 +5891,7 @@ namespace {
for (size_t BlockRangeIndex = 0; BlockRangeIndex < BlockRangeWorks.size(); BlockRangeIndex++)
{
+ ZEN_ASSERT(!PrimeCacheOnly);
if (AbortFlag)
{
break;
@@ -6000,6 +6063,13 @@ namespace {
{
break;
}
+
+ if (PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(BlockDescriptions[BlockIndex].BlockHash))
+ {
+ DownloadStats.RequestsCompleteCount++;
+ continue;
+ }
+
Work.ScheduleWork(
NetworkPool,
[&, BlockIndex](std::atomic<bool>&) {
@@ -6011,8 +6081,10 @@ namespace {
FilteredDownloadedBytesPerSecond.Start();
- IoBuffer BlockBuffer;
- if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash))
+ IoBuffer BlockBuffer;
+ const bool ExistsInCache =
+ Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash);
+ if (ExistsInCache)
{
BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash);
}
@@ -6042,119 +6114,123 @@ namespace {
FilteredDownloadedBytesPerSecond.Stop();
}
- std::filesystem::path BlockChunkPath;
-
- // Check if the dowloaded block is file based and we can move it directly without rewriting it
+ if (!PrimeCacheOnly)
{
- IoBufferFileReference FileRef;
- if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
- (FileRef.FileChunkSize == BlockSize))
+ std::filesystem::path BlockChunkPath;
+
+ // Check if the dowloaded block is file based and we can move it directly without rewriting it
{
- ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock");
- std::error_code Ec;
- std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (!Ec)
+ IoBufferFileReference FileRef;
+ if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
+ (FileRef.FileChunkSize == BlockSize))
{
- BlockBuffer.SetDeleteOnClose(false);
- BlockBuffer = {};
- BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString();
- std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec);
- if (Ec)
+ ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock");
+ std::error_code Ec;
+ std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
{
- BlockChunkPath = std::filesystem::path{};
+ BlockBuffer.SetDeleteOnClose(false);
+ BlockBuffer = {};
+ BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString();
+ std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec);
+ if (Ec)
+ {
+ BlockChunkPath = std::filesystem::path{};
- // Re-open the temp file again
- BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
- BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
- BlockBuffer.SetDeleteOnClose(true);
+ // Re-open the temp file again
+ BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
+ BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
+ BlockBuffer.SetDeleteOnClose(true);
+ }
}
}
}
- }
- if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
- {
- ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock");
- // Could not be moved and rather large, lets store it on disk
- BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString();
- TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
- BlockBuffer = {};
- }
+ if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock");
+ // Could not be moved and rather large, lets store it on disk
+ BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString();
+ TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
+ BlockBuffer = {};
+ }
- if (!AbortFlag)
- {
- Work.ScheduleWork(
- WritePool,
- [&Work,
- &WritePool,
- &RemoteContent,
- &RemoteLookup,
- CacheFolderPath,
- &RemoteChunkIndexNeedsCopyFromSourceFlags,
- &SequenceIndexChunksLeftToWriteCounters,
- BlockIndex,
- &BlockDescriptions,
- &WriteChunkStats,
- &DiskStats,
- &WritePartsComplete,
- TotalPartWriteCount,
- &FilteredWrittenBytesPerSecond,
- BlockChunkPath,
- BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock");
+ if (!AbortFlag)
+ {
+ Work.ScheduleWork(
+ WritePool,
+ [&Work,
+ &WritePool,
+ &RemoteContent,
+ &RemoteLookup,
+ CacheFolderPath,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ BlockIndex,
+ &BlockDescriptions,
+ &WriteChunkStats,
+ &DiskStats,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ BlockChunkPath,
+ BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock");
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
- if (BlockChunkPath.empty())
- {
- ZEN_ASSERT(BlockBuffer);
- }
- else
- {
- ZEN_ASSERT(!BlockBuffer);
- BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
- if (!BlockBuffer)
+ if (BlockChunkPath.empty())
{
- throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}",
- BlockDescription.BlockHash,
- BlockChunkPath));
+ ZEN_ASSERT(BlockBuffer);
+ }
+ else
+ {
+ ZEN_ASSERT(!BlockBuffer);
+ BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(
+ fmt::format("Could not open dowloaded block {} from {}",
+ BlockDescription.BlockHash,
+ BlockChunkPath));
+ }
}
- }
- FilteredWrittenBytesPerSecond.Start();
- if (!WriteBlockToDisk(CacheFolderPath,
- RemoteContent,
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- WritePool,
- CompositeBuffer(std::move(BlockBuffer)),
- RemoteLookup,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- DiskStats))
- {
- std::error_code DummyEc;
- std::filesystem::remove(BlockChunkPath, DummyEc);
- throw std::runtime_error(
- fmt::format("Block {} is malformed", BlockDescription.BlockHash));
- }
+ FilteredWrittenBytesPerSecond.Start();
+ if (!WriteBlockToDisk(CacheFolderPath,
+ RemoteContent,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
+ CompositeBuffer(std::move(BlockBuffer)),
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ DiskStats))
+ {
+ std::error_code DummyEc;
+ std::filesystem::remove(BlockChunkPath, DummyEc);
+ throw std::runtime_error(
+ fmt::format("Block {} is malformed", BlockDescription.BlockHash));
+ }
- if (!BlockChunkPath.empty())
- {
- std::filesystem::remove(BlockChunkPath);
- }
+ if (!BlockChunkPath.empty())
+ {
+ std::filesystem::remove(BlockChunkPath);
+ }
- WritePartsComplete++;
+ WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
}
- }
- },
- Work.DefaultErrorFunction());
+ },
+ Work.DefaultErrorFunction());
+ }
}
}
}
@@ -6176,19 +6252,24 @@ namespace {
(DownloadStats.RequestsCompleteCount == TotalRequestCount)
? ""
: fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8));
- std::string Details = fmt::format("{}/{} ({}{}) downloaded. {}/{} ({}B/s) written.",
- DownloadStats.RequestsCompleteCount.load(),
- TotalRequestCount,
- NiceBytes(DownloadedBytes),
- DownloadRateString,
- NiceBytes(DiskStats.WriteByteCount.load()),
- NiceBytes(BytesToWrite),
- NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()));
- WriteProgressBar.UpdateState({.Task = "Writing chunks ",
- .Details = Details,
- .TotalCount = gsl::narrow<uint64_t>(BytesToWrite),
- .RemainingCount = gsl::narrow<uint64_t>(BytesToWrite - DiskStats.WriteByteCount.load())},
- false);
+ std::string WriteDetails = PrimeCacheOnly ? ""
+ : fmt::format(" {}/{} ({}B/s) written.",
+ NiceBytes(DiskStats.WriteByteCount.load()),
+ NiceBytes(BytesToWrite),
+ NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()));
+ std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}",
+ DownloadStats.RequestsCompleteCount.load(),
+ TotalRequestCount,
+ NiceBytes(DownloadedBytes),
+ DownloadRateString,
+ WriteDetails);
+ WriteProgressBar.UpdateState(
+ {.Task = PrimeCacheOnly ? "Downloading " : "Writing chunks ",
+ .Details = Details,
+ .TotalCount = PrimeCacheOnly ? TotalRequestCount : BytesToWrite,
+ .RemainingCount = PrimeCacheOnly ? (TotalRequestCount - DownloadStats.RequestsCompleteCount.load())
+ : (BytesToWrite - DiskStats.WriteByteCount.load())},
+ false);
});
}
@@ -6202,21 +6283,24 @@ namespace {
WriteProgressBar.Finish();
- uint32_t RawSequencesMissingWriteCount = 0;
- for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++)
+ if (!PrimeCacheOnly)
{
- const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex];
- if (SequenceIndexChunksLeftToWriteCounter.load() != 0)
+ uint32_t RawSequencesMissingWriteCount = 0;
+ for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++)
{
- RawSequencesMissingWriteCount++;
- const uint32_t PathIndex = RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex];
- const std::filesystem::path& IncompletePath = RemoteContent.Paths[PathIndex];
- ZEN_ASSERT(!IncompletePath.empty());
- const uint32_t ExpectedSequenceCount = RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex];
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount);
+ const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex];
+ if (SequenceIndexChunksLeftToWriteCounter.load() != 0)
+ {
+ RawSequencesMissingWriteCount++;
+ const uint32_t PathIndex = RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex];
+ const std::filesystem::path& IncompletePath = RemoteContent.Paths[PathIndex];
+ ZEN_ASSERT(!IncompletePath.empty());
+ const uint32_t ExpectedSequenceCount = RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex];
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount);
+ }
}
+ ZEN_ASSERT(RawSequencesMissingWriteCount == 0);
}
- ZEN_ASSERT(RawSequencesMissingWriteCount == 0);
const uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() + DownloadStats.DownloadedBlockByteCount.load() +
+DownloadStats.DownloadedPartialBlockByteCount.load();
@@ -6234,6 +6318,11 @@ namespace {
WriteChunkStats.WriteTimeUs = FilteredWrittenBytesPerSecond.GetElapsedTimeUS();
}
+ if (PrimeCacheOnly)
+ {
+ return;
+ }
+
// Move all files we will reuse to cache folder
// TODO: If WipeTargetFolder is false we could check which files are already correct and leave them in place
if (!LocalPathIndexesMatchingSequenceIndexes.empty())
@@ -6319,7 +6408,7 @@ namespace {
ZEN_TRACE_CPU("UpdateFolder_FinalizeTree");
Stopwatch Timer;
- WorkerThreadPool& WritePool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst);
+ WorkerThreadPool& WritePool = GetIOWorkerPool();
ProgressBar RebuildProgressBar(UsePlainProgress);
ParallellWork Work(AbortFlag);
@@ -6930,7 +7019,7 @@ namespace {
Path,
std::move(IsAcceptedFolder),
std::move(IsAcceptedFile),
- SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst),
+ GetIOWorkerPool(),
UsePlainProgress ? 5000 : 200,
[&](bool, std::ptrdiff_t) {
ZEN_CONSOLE_VERBOSE("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path);
@@ -7003,7 +7092,7 @@ namespace {
FilteredBytesHashed.Start();
ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent(
ChunkingStats,
- SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst),
+ GetIOWorkerPool(),
Path,
UpdatedContent,
ChunkController,
@@ -7080,7 +7169,7 @@ namespace {
FilteredBytesHashed.Start();
ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent(
ChunkingStats,
- SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst),
+ GetIOWorkerPool(),
Path,
CurrentLocalFolderContent,
ChunkController,
@@ -7122,10 +7211,13 @@ namespace {
bool AllowMultiparts,
bool AllowPartialBlockRequests,
bool WipeTargetFolder,
- bool PostDownloadVerify)
+ bool PostDownloadVerify,
+ bool PrimeCacheOnly)
{
ZEN_TRACE_CPU("DownloadFolder");
+ ZEN_ASSERT((!PrimeCacheOnly) || (PrimeCacheOnly && (!AllowPartialBlockRequests)));
+
Stopwatch DownloadTimer;
const std::filesystem::path ZenTempFolder = Path / ZenTempFolderName;
@@ -7150,27 +7242,30 @@ namespace {
ChunkedFolderContent RemoteContent =
GetRemoteContent(Storage, BuildId, AllBuildParts, ChunkController, PartContents, BlockDescriptions, LooseChunkHashes);
- const std::uint64_t LargeAttachmentSize = AllowMultiparts ? PreferredMultipartChunkSize * 4u : (std::uint64_t)-1;
- if (!ChunkController)
- {
- ZEN_CONSOLE("Warning: Unspecified chunking algorith, using default");
- ChunkController = CreateBasicChunkingController();
- }
-
+ const std::uint64_t LargeAttachmentSize = AllowMultiparts ? PreferredMultipartChunkSize * 4u : (std::uint64_t)-1;
GetFolderContentStatistics LocalFolderScanStats;
ChunkingStatistics ChunkingStats;
ChunkedFolderContent LocalContent;
- if (std::filesystem::is_directory(Path))
+ if (!PrimeCacheOnly)
{
- if (!WipeTargetFolder)
+ if (std::filesystem::is_directory(Path))
+ {
+ if (!WipeTargetFolder)
+ {
+ if (!ChunkController)
+ {
+ ZEN_CONSOLE("Warning: Unspecified chunking algorith, using default");
+ ChunkController = CreateBasicChunkingController();
+ }
+
+ LocalContent = GetLocalContent(LocalFolderScanStats, ChunkingStats, Path, *ChunkController);
+ }
+ }
+ else
{
- LocalContent = GetLocalContent(LocalFolderScanStats, ChunkingStats, Path, *ChunkController);
+ CreateDirectories(Path);
}
}
- else
- {
- CreateDirectories(Path);
- }
if (AbortFlag)
{
return;
@@ -7251,6 +7346,7 @@ namespace {
LooseChunkHashes,
AllowPartialBlockRequests,
WipeTargetFolder,
+ PrimeCacheOnly,
LocalFolderState,
DiskStats,
CacheMappingStats,
@@ -7260,20 +7356,23 @@ namespace {
if (!AbortFlag)
{
- VerifyFolder(RemoteContent, Path, PostDownloadVerify, VerifyFolderStats);
+ if (!PrimeCacheOnly)
+ {
+ VerifyFolder(RemoteContent, Path, PostDownloadVerify, VerifyFolderStats);
- Stopwatch WriteStateTimer;
- CbObject StateObject = CreateStateObject(BuildId, AllBuildParts, PartContents, LocalFolderState);
+ Stopwatch WriteStateTimer;
+ CbObject StateObject = CreateStateObject(BuildId, AllBuildParts, PartContents, LocalFolderState);
- CreateDirectories((Path / ZenStateFilePath).parent_path());
- TemporaryFile::SafeWriteFile(Path / ZenStateFilePath, StateObject.GetView());
- ZEN_CONSOLE("Wrote local state in {}", NiceTimeSpanMs(WriteStateTimer.GetElapsedTimeMs()));
+ CreateDirectories((Path / ZenStateFilePath).parent_path());
+ TemporaryFile::SafeWriteFile(Path / ZenStateFilePath, StateObject.GetView());
+ ZEN_CONSOLE("Wrote local state in {}", NiceTimeSpanMs(WriteStateTimer.GetElapsedTimeMs()));
#if 0
- ExtendableStringBuilder<1024> SB;
- CompactBinaryToJson(StateObject, SB);
- WriteFile(Path / ZenStateFileJsonPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size()));
+ ExtendableStringBuilder<1024> SB;
+ CompactBinaryToJson(StateObject, SB);
+ WriteFile(Path / ZenStateFileJsonPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size()));
#endif // 0
+ }
const uint64_t DownloadCount = DownloadStats.DownloadedChunkCount.load() + DownloadStats.DownloadedBlockCount.load() +
DownloadStats.DownloadedPartialBlockCount.load();
const uint64_t DownloadByteCount = DownloadStats.DownloadedChunkByteCount.load() +
@@ -7307,6 +7406,23 @@ namespace {
NiceTimeSpanMs(VerifyFolderStats.VerifyElapsedWallTimeUs / 1000));
}
}
+ if (PrimeCacheOnly)
+ {
+ if (Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->Flush(5000, [](intptr_t Remaining) {
+ if (Remaining == 0)
+ {
+ ZEN_CONSOLE("Build cache upload complete");
+ }
+ else
+ {
+ ZEN_CONSOLE("Waiting for build cache to complete uploading. {} blobs remaining", Remaining);
+ }
+ return !AbortFlag;
+ });
+ }
+ }
if (CleanDirectory(ZenTempFolder, {}))
{
std::filesystem::remove(ZenTempFolder);
@@ -7584,6 +7700,15 @@ BuildsCommand::BuildsCommand()
Ops.add_option("output", "", "verbose", "Enable verbose console output", cxxopts::value(m_Verbose), "<verbose>");
};
+ auto AddWorkerOptions = [this](cxxopts::Options& Ops) {
+ Ops.add_option("",
+ "",
+ "boost-workers",
+ "Increase the number of worker threads - may cause computer to less responsive",
+ cxxopts::value(m_BoostWorkerThreads),
+ "<boostworkers>");
+ };
+
m_Options.add_option("",
"v",
"verb",
@@ -7618,6 +7743,7 @@ BuildsCommand::BuildsCommand()
AddFileOptions(m_UploadOptions);
AddOutputOptions(m_UploadOptions);
AddCacheOptions(m_UploadOptions);
+ AddWorkerOptions(m_UploadOptions);
m_UploadOptions.add_options()("h,help", "Print help");
m_UploadOptions.add_option("", "l", "local-path", "Root file system folder for build", cxxopts::value(m_Path), "<local-path>");
m_UploadOptions.add_option("",
@@ -7682,6 +7808,15 @@ BuildsCommand::BuildsCommand()
AddFileOptions(m_DownloadOptions);
AddOutputOptions(m_DownloadOptions);
AddCacheOptions(m_DownloadOptions);
+
+ m_DownloadOptions.add_option("cache",
+ "",
+ "cache-prime-only",
+ "Only download blobs missing in cache and upload to cache",
+ cxxopts::value(m_PrimeCacheOnly),
+ "<cacheprimeonly>");
+
+ AddWorkerOptions(m_DownloadOptions);
m_DownloadOptions.add_options()("h,help", "Print help");
m_DownloadOptions.add_option("", "l", "local-path", "Root file system folder for build", cxxopts::value(m_Path), "<local-path>");
m_DownloadOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>");
@@ -7719,6 +7854,7 @@ BuildsCommand::BuildsCommand()
m_DownloadOptions.positional_help("local-path build-id build-part-name");
AddOutputOptions(m_DiffOptions);
+ AddWorkerOptions(m_DiffOptions);
m_DiffOptions.add_options()("h,help", "Print help");
m_DiffOptions.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>");
m_DiffOptions.add_option("", "c", "compare-path", "Root file system folder used as diff", cxxopts::value(m_DiffPath), "<diff-path>");
@@ -7735,6 +7871,7 @@ BuildsCommand::BuildsCommand()
AddFileOptions(m_TestOptions);
AddOutputOptions(m_TestOptions);
AddCacheOptions(m_TestOptions);
+ AddWorkerOptions(m_TestOptions);
m_TestOptions.add_options()("h,help", "Print help");
m_TestOptions.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>");
m_TestOptions.add_option("",
@@ -7765,6 +7902,7 @@ BuildsCommand::BuildsCommand()
AddCloudOptions(m_ValidateBuildPartOptions);
AddFileOptions(m_ValidateBuildPartOptions);
AddOutputOptions(m_ValidateBuildPartOptions);
+ AddWorkerOptions(m_ValidateBuildPartOptions);
m_ValidateBuildPartOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>");
m_ValidateBuildPartOptions.add_option("",
"",
@@ -7786,6 +7924,7 @@ BuildsCommand::BuildsCommand()
AddFileOptions(m_MultiTestDownloadOptions);
AddOutputOptions(m_MultiTestDownloadOptions);
AddCacheOptions(m_MultiTestDownloadOptions);
+ AddWorkerOptions(m_MultiTestDownloadOptions);
m_MultiTestDownloadOptions
.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>");
m_MultiTestDownloadOptions.add_option("", "", "build-ids", "Build Ids list separated by ','", cxxopts::value(m_BuildIds), "<ids>");
@@ -7991,9 +8130,13 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
.RetryCount = 0});
if (Result.CacheHttp->Get("/health").IsSuccess())
{
- Result.BuildCacheStorage =
- CreateZenBuildStorageCache(*Result.CacheHttp, StorageCacheStats, m_Namespace, m_Bucket, TempPath / "zencache");
- CacheDescription = fmt::format("zen cache {}. SessionId: '{}'", m_ZenCacheHost, Result.CacheHttp->GetSessionId());
+ Result.BuildCacheStorage = CreateZenBuildStorageCache(*Result.CacheHttp,
+ StorageCacheStats,
+ m_Namespace,
+ m_Bucket,
+ TempPath / "zencache",
+ m_PrimeCacheOnly);
+ CacheDescription = fmt::format("zen cache {}. SessionId: '{}'", m_ZenCacheHost, Result.CacheHttp->GetSessionId());
if (!m_Namespace.empty())
{
CacheDescription += fmt::format(" {}.", m_Namespace);
@@ -8014,6 +8157,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
return Result;
};
+ BoostWorkerThreads = m_BoostWorkerThreads;
+
try
{
if (SubOption == &m_ListOptions)
@@ -8243,6 +8388,22 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw zen::OptionParseException(fmt::format("build-part-id conflicts with build-part-name\n{}", m_DownloadOptions.help()));
}
+ if (m_PostDownloadVerify && m_PrimeCacheOnly)
+ {
+ throw zen::OptionParseException(
+ fmt::format("'cache-prime-only' option is not compatible with 'verify' option\n{}", m_DownloadOptions.help()));
+ }
+
+ if (m_Clean && m_PrimeCacheOnly)
+ {
+ ZEN_WARN("ignoring 'clean' option when 'cache-prime-only' is enabled");
+ }
+
+ if (m_AllowPartialBlockRequests && m_PrimeCacheOnly)
+ {
+ ZEN_WARN("ignoring 'allow-partial-block-requests' option when 'cache-prime-only' is enabled");
+ }
+
std::vector<Oid> BuildPartIds;
for (const std::string& BuildPartId : m_BuildPartIds)
{
@@ -8267,9 +8428,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_BuildPartNames,
Path,
m_AllowMultiparts,
- m_AllowPartialBlockRequests,
+ m_AllowPartialBlockRequests && !m_PrimeCacheOnly,
m_Clean,
- m_PostDownloadVerify);
+ m_PostDownloadVerify,
+ m_PrimeCacheOnly);
if (false)
{
@@ -8350,7 +8512,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_AllowMultiparts,
m_AllowPartialBlockRequests,
BuildIdString == m_BuildIds.front(),
- true);
+ true,
+ false);
if (AbortFlag)
{
ZEN_CONSOLE("Download cancelled");
@@ -8441,7 +8604,16 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
const std::filesystem::path DownloadPath = Path.parent_path() / (m_BuildPartName + "_download");
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
- DownloadFolder(Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, true, true);
+ DownloadFolder(Storage,
+ BuildId,
+ {BuildPartId},
+ {},
+ DownloadPath,
+ m_AllowMultiparts,
+ m_AllowPartialBlockRequests,
+ true,
+ true,
+ false);
if (AbortFlag)
{
ZEN_CONSOLE("Download failed.");
@@ -8453,7 +8625,16 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildPartId,
m_BuildPartName,
DownloadPath);
- DownloadFolder(Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
+ DownloadFolder(Storage,
+ BuildId,
+ {BuildPartId},
+ {},
+ DownloadPath,
+ m_AllowMultiparts,
+ m_AllowPartialBlockRequests,
+ false,
+ true,
+ false);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed. (identical target)");
@@ -8504,7 +8685,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SourceSize > 256)
{
Work.ScheduleWork(
- SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst),
+ GetIOWorkerPool(),
[SourceSize, FilePath](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -8557,7 +8738,16 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildPartId,
m_BuildPartName,
DownloadPath);
- DownloadFolder(Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
+ DownloadFolder(Storage,
+ BuildId,
+ {BuildPartId},
+ {},
+ DownloadPath,
+ m_AllowMultiparts,
+ m_AllowPartialBlockRequests,
+ false,
+ true,
+ false);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed. (scrambled target)");
@@ -8595,7 +8785,16 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
- DownloadFolder(Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
+ DownloadFolder(Storage,
+ BuildId,
+ {BuildPartId},
+ {},
+ DownloadPath,
+ m_AllowMultiparts,
+ m_AllowPartialBlockRequests,
+ false,
+ true,
+ false);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed.");
@@ -8611,7 +8810,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_AllowMultiparts,
m_AllowPartialBlockRequests,
false,
- true);
+ true,
+ false);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed.");
@@ -8627,7 +8827,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_AllowMultiparts,
m_AllowPartialBlockRequests,
false,
- true);
+ true,
+ false);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed.");
diff --git a/src/zen/cmds/builds_cmd.h b/src/zen/cmds/builds_cmd.h
index b5af236e1..46257a567 100644
--- a/src/zen/cmds/builds_cmd.h
+++ b/src/zen/cmds/builds_cmd.h
@@ -27,8 +27,9 @@ private:
std::string m_SystemRootDir;
- bool m_PlainProgress = false;
- bool m_Verbose = false;
+ bool m_PlainProgress = false;
+ bool m_Verbose = false;
+ bool m_BoostWorkerThreads = false;
// cloud builds
std::string m_BuildsUrl;
@@ -42,6 +43,7 @@ private:
// cache
std::string m_ZenCacheHost;
+ bool m_PrimeCacheOnly = false;
std::string m_BuildId;
bool m_CreateBuild = false;
diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp
index c95215889..f273ac699 100644
--- a/src/zenutil/buildstoragecache.cpp
+++ b/src/zenutil/buildstoragecache.cpp
@@ -11,6 +11,7 @@
#include <zencore/workthreadpool.h>
#include <zenhttp/httpclient.h>
#include <zenhttp/packageformat.h>
+#include <zenutil/workerpools.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_set.h>
@@ -27,13 +28,16 @@ public:
BuildStorageCache::Statistics& Stats,
std::string_view Namespace,
std::string_view Bucket,
- const std::filesystem::path& TempFolderPath)
+ const std::filesystem::path& TempFolderPath,
+ bool BoostBackgroundThreadCount)
: m_HttpClient(HttpClient)
, m_Stats(Stats)
, m_Namespace(Namespace.empty() ? "none" : Namespace)
, m_Bucket(Bucket.empty() ? "none" : Bucket)
, m_TempFolderPath(std::filesystem::path(TempFolderPath).make_preferred())
- , m_BackgroundWorkPool(1)
+ , m_BoostBackgroundThreadCount(BoostBackgroundThreadCount)
+ , m_BackgroundWorkPool(m_BoostBackgroundThreadCount ? GetSmallWorkerPool(EWorkloadType::Background)
+ : GetTinyWorkerPool(EWorkloadType::Background))
, m_PendingBackgroundWorkCount(1)
, m_CancelBackgroundWork(false)
{
@@ -44,8 +48,11 @@ public:
try
{
m_CancelBackgroundWork.store(true);
- m_PendingBackgroundWorkCount.CountDown();
- m_PendingBackgroundWorkCount.Wait();
+ if (!IsFlushed)
+ {
+ m_PendingBackgroundWorkCount.CountDown();
+ m_PendingBackgroundWorkCount.Wait();
+ }
}
catch (const std::exception& Ex)
{
@@ -86,6 +93,7 @@ public:
ZenContentType ContentType,
const CompositeBuffer& Payload) override
{
+ ZEN_ASSERT(!IsFlushed);
ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary);
ScheduleBackgroundWork(
[this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = CompositeBuffer(Payload)]() {
@@ -132,6 +140,7 @@ public:
virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) override
{
+ ZEN_ASSERT(!IsFlushed);
ScheduleBackgroundWork([this,
BuildId = Oid(BuildId),
BlobRawHashes = std::vector<IoHash>(BlobHashes.begin(), BlobHashes.end()),
@@ -329,6 +338,39 @@ public:
return {};
}
+ virtual void Flush(int32_t UpdateInteralMS, std::function<bool(intptr_t Remaining)>&& UpdateCallback) override
+ {
+ if (IsFlushed)
+ {
+ return;
+ }
+ if (!IsFlushed)
+ {
+ m_PendingBackgroundWorkCount.CountDown();
+ IsFlushed = true;
+ }
+ if (m_PendingBackgroundWorkCount.Wait(100))
+ {
+ return;
+ }
+ while (true)
+ {
+ intptr_t Remaining = m_PendingBackgroundWorkCount.Remaining();
+ if (UpdateCallback(Remaining))
+ {
+ if (m_PendingBackgroundWorkCount.Wait(UpdateInteralMS))
+ {
+ UpdateCallback(0);
+ return;
+ }
+ }
+ else
+ {
+ m_CancelBackgroundWork.store(true);
+ }
+ }
+ }
+
private:
void AddStatistic(const HttpClient::Response& Result)
{
@@ -343,8 +385,10 @@ private:
const std::string m_Namespace;
const std::string m_Bucket;
const std::filesystem::path m_TempFolderPath;
+ const bool m_BoostBackgroundThreadCount;
+ bool IsFlushed = false;
- WorkerThreadPool m_BackgroundWorkPool;
+ WorkerThreadPool& m_BackgroundWorkPool;
Latch m_PendingBackgroundWorkCount;
std::atomic<bool> m_CancelBackgroundWork;
};
@@ -354,9 +398,10 @@ CreateZenBuildStorageCache(HttpClient& HttpClient,
BuildStorageCache::Statistics& Stats,
std::string_view Namespace,
std::string_view Bucket,
- const std::filesystem::path& TempFolderPath)
+ const std::filesystem::path& TempFolderPath,
+ bool BoostBackgroundThreadCount)
{
- return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath);
+ return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath, BoostBackgroundThreadCount);
}
} // namespace zen
diff --git a/src/zenutil/include/zenutil/buildstoragecache.h b/src/zenutil/include/zenutil/buildstoragecache.h
index 08c936bf5..cab35328d 100644
--- a/src/zenutil/include/zenutil/buildstoragecache.h
+++ b/src/zenutil/include/zenutil/buildstoragecache.h
@@ -42,11 +42,16 @@ public:
};
virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0;
+
+ virtual void Flush(
+ int32_t UpdateInteralMS,
+ std::function<bool(intptr_t Remaining)>&& UpdateCallback = [](intptr_t) { return true; }) = 0;
};
std::unique_ptr<BuildStorageCache> CreateZenBuildStorageCache(HttpClient& HttpClient,
BuildStorageCache::Statistics& Stats,
std::string_view Namespace,
std::string_view Bucket,
- const std::filesystem::path& TempFolderPath);
+ const std::filesystem::path& TempFolderPath,
+ bool BoostBackgroundThreadCount);
} // namespace zen
diff --git a/src/zenutil/include/zenutil/workerpools.h b/src/zenutil/include/zenutil/workerpools.h
index 9683ad720..df2033bca 100644
--- a/src/zenutil/include/zenutil/workerpools.h
+++ b/src/zenutil/include/zenutil/workerpools.h
@@ -21,6 +21,9 @@ WorkerThreadPool& GetMediumWorkerPool(EWorkloadType WorkloadType);
// Worker pool with std::thread::hardware_concurrency() / 8 worker threads, but at least one thread
WorkerThreadPool& GetSmallWorkerPool(EWorkloadType WorkloadType);
+// Worker pool with minimum number of worker threads, but at least one thread
+WorkerThreadPool& GetTinyWorkerPool(EWorkloadType WorkloadType);
+
// Special worker pool that does not use worker thread but issues all scheduled work on the calling thread
// This is useful for debugging when multiple async thread can make stepping in debugger complicated
WorkerThreadPool& GetSyncWorkerPool();
diff --git a/src/zenutil/workerpools.cpp b/src/zenutil/workerpools.cpp
index e3165e838..797034978 100644
--- a/src/zenutil/workerpools.cpp
+++ b/src/zenutil/workerpools.cpp
@@ -11,9 +11,10 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
namespace {
- const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(std::thread::hardware_concurrency());
+ const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(Max(std::thread::hardware_concurrency() - 1u, 2u));
const int MediumWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 4u), 2u));
const int SmallWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 8u), 1u));
+ const int TinyWorkerThreadPoolTreadCount = 1;
static bool IsShutDown = false;
@@ -35,6 +36,9 @@ namespace {
WorkerPool BurstSmallWorkerPool = {.TreadCount = SmallWorkerThreadPoolTreadCount, .Name = "SmallThreadPool(burst)"};
WorkerPool BackgroundSmallWorkerPool = {.TreadCount = SmallWorkerThreadPoolTreadCount, .Name = "SmallThreadPool(bkg)"};
+ WorkerPool BurstTinyWorkerPool = {.TreadCount = TinyWorkerThreadPoolTreadCount, .Name = "TinyThreadPool(burst)"};
+ WorkerPool BackgroundTinyWorkerPool = {.TreadCount = TinyWorkerThreadPoolTreadCount, .Name = "TinyThreadPool(bkg)"};
+
WorkerPool SyncWorkerPool = {.TreadCount = 0, .Name = "SyncThreadPool"};
WorkerThreadPool& EnsurePoolPtr(WorkerPool& Pool)
@@ -75,6 +79,12 @@ GetSmallWorkerPool(EWorkloadType WorkloadType)
}
WorkerThreadPool&
+GetTinyWorkerPool(EWorkloadType WorkloadType)
+{
+ return EnsurePoolPtr(WorkloadType == EWorkloadType::Burst ? BurstTinyWorkerPool : BackgroundTinyWorkerPool);
+}
+
+WorkerThreadPool&
GetSyncWorkerPool()
{
return EnsurePoolPtr(SyncWorkerPool);
@@ -91,6 +101,8 @@ ShutdownWorkerPools()
BackgroundMediumWorkerPool.Pool.reset();
BurstSmallWorkerPool.Pool.reset();
BackgroundSmallWorkerPool.Pool.reset();
+ BurstTinyWorkerPool.Pool.reset();
+ BackgroundTinyWorkerPool.Pool.reset();
SyncWorkerPool.Pool.reset();
}
} // namespace zen