aboutsummaryrefslogtreecommitdiff
path: root/src/zen/cmds/builds_cmd.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-03-26 17:06:23 +0100
committerGitHub Enterprise <[email protected]>2025-03-26 17:06:23 +0100
commit28bc5ebf05984385cc0567c89b1d8e7a541ebef8 (patch)
tree424efc19bc8630d33f76d3372f9105731d00a45f /src/zen/cmds/builds_cmd.cpp
parentdon't let auth env argument block other auth options (#316) (diff)
downloadarchived-zen-28bc5ebf05984385cc0567c89b1d8e7a541ebef8.tar.xz
archived-zen-28bc5ebf05984385cc0567c89b1d8e7a541ebef8.zip
zen build cache service (#318)
- **EXPERIMENTAL** `zen builds` - Feature: `--zen-cache-host` option for `upload` and `download` operations to use a zenserver host `/builds` endpoint for storing build blob and blob metadata - Feature: New `/builds` endpoint for caching build blobs and blob metadata - `/builds/{namespace}/{bucket}/{buildid}/blobs/{hash}` `GET` and `PUT` method for storing and fetching blobs - `/builds/{namespace}/{bucket}/{buildid}/blobs/putBlobMetadata` `POST` method for storing metadata about blobs - `/builds/{namespace}/{bucket}/{buildid}/blobs/getBlobMetadata` `POST` method for fetching metadata about blobs - `/builds/{namespace}/{bucket}/{buildid}/blobs/exists` `POST` method for checking existance of blobs
Diffstat (limited to 'src/zen/cmds/builds_cmd.cpp')
-rw-r--r--src/zen/cmds/builds_cmd.cpp1883
1 files changed, 1071 insertions, 812 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index 889ccef0b..b2ad579f1 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -20,6 +20,7 @@
#include <zenhttp/httpclient.h>
#include <zenhttp/httpclientauth.h>
#include <zenhttp/httpcommon.h>
+#include <zenutil/buildstoragecache.h>
#include <zenutil/chunkblock.h>
#include <zenutil/chunkedcontent.h>
#include <zenutil/chunkedfile.h>
@@ -51,6 +52,8 @@ ZEN_THIRD_PARTY_INCLUDES_END
#define EXTRA_VERIFY 0
+#define ZEN_CLOUD_STORAGE "Cloud Storage"
+
namespace zen {
namespace {
static std::atomic<bool> AbortFlag = false;
@@ -87,6 +90,8 @@ namespace {
const double DefaultLatency = 0; // .0010;
const double DefaultDelayPerKBSec = 0; // 0.00005;
+ const bool SingleThreaded = false;
+
const std::string ZenFolderName = ".zen";
const std::string ZenStateFilePath = fmt::format("{}/current_state.cbo", ZenFolderName);
const std::string ZenStateFileJsonPath = fmt::format("{}/current_state.json", ZenFolderName);
@@ -204,22 +209,27 @@ namespace {
{
try
{
- std::filesystem::remove(LocalFilePath);
- }
- catch (const std::exception&)
- {
- // DeleteOnClose files may be a bit slow in getting cleaned up, so pause amd retry one time
- Sleep(200);
- try
- {
- std::filesystem::remove(LocalFilePath);
- }
- catch (const std::exception& Ex)
+ std::error_code Ec;
+ std::filesystem::remove(LocalFilePath, Ec);
+ if (Ec)
{
- ZEN_WARN("Failed removing file {}. Reason: {}", LocalFilePath, Ex.what());
- CleanWipe = false;
+ // DeleteOnClose files may be a bit slow in getting cleaned up, so pause amd retry one time
+ Ec.clear();
+ if (std::filesystem::exists(LocalFilePath, Ec) || Ec)
+ {
+ Sleep(200);
+ if (std::filesystem::exists(LocalFilePath))
+ {
+ std::filesystem::remove(LocalFilePath);
+ }
+ }
}
}
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed removing file {}. Reason: {}", LocalFilePath, Ex.what());
+ CleanWipe = false;
+ }
}
for (const std::filesystem::path& LocalDirPath : LocalDirectoryContent.Directories)
@@ -425,7 +435,7 @@ namespace {
Path,
std::move(IsAcceptedFolder),
std::move(IsAcceptedFile),
- GetMediumWorkerPool(EWorkloadType::Burst),
+ SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst),
UsePlainProgress ? 5000 : 200,
[](bool, std::ptrdiff_t) {},
AbortFlag);
@@ -439,7 +449,7 @@ namespace {
FilteredBytesHashed.Start();
ChunkedFolderContent FolderContent = ChunkFolderContent(
ChunkingStats,
- GetMediumWorkerPool(EWorkloadType::Burst),
+ SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst),
Path,
Content,
ChunkController,
@@ -603,11 +613,9 @@ namespace {
struct WriteChunkStatistics
{
- std::atomic<uint32_t> ChunkCountWritten = 0;
- std::atomic<uint64_t> ChunkBytesWritten = 0;
- uint64_t DownloadTimeUs = 0;
- uint64_t WriteTimeUs = 0;
- uint64_t WriteChunksElapsedWallTimeUs = 0;
+ uint64_t DownloadTimeUs = 0;
+ uint64_t WriteTimeUs = 0;
+ uint64_t WriteChunksElapsedWallTimeUs = 0;
};
struct RebuildFolderStateStatistics
@@ -626,6 +634,15 @@ namespace {
uint64_t VerifyElapsedWallTimeUs = 0;
};
+ struct StorageInstance
+ {
+ std::unique_ptr<HttpClient> BuildStorageHttp;
+ std::unique_ptr<BuildStorage> BuildStorage;
+ std::string StorageName;
+ std::unique_ptr<HttpClient> CacheHttp;
+ std::unique_ptr<BuildStorageCache> BuildCacheStorage;
+ };
+
std::vector<uint32_t> CalculateAbsoluteChunkOrders(const std::span<const IoHash> LocalChunkHashes,
const std::span<const uint32_t> LocalChunkOrder,
const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& ChunkHashToLocalChunkIndex,
@@ -772,7 +789,7 @@ namespace {
std::span<const uint32_t> ChunkCounts,
std::span<const IoHash> LocalChunkHashes,
std::span<const uint64_t> LocalChunkRawSizes,
- std::vector<uint32_t> AbsoluteChunkOrders,
+ const std::vector<uint32_t>& AbsoluteChunkOrders,
const std::span<const uint32_t> LooseLocalChunkIndexes,
const std::span<IoHash> BlockHashes)
{
@@ -1444,7 +1461,7 @@ namespace {
for (auto& WorkItem : WorkItems)
{
Work.ScheduleWork(
- NetworkPool, // GetSyncWorkerPool(),//
+ NetworkPool,
[WorkItem = std::move(WorkItem)](std::atomic<bool>&) {
ZEN_TRACE_CPU("DownloadLargeBlob_Work");
if (!AbortFlag)
@@ -1513,15 +1530,16 @@ namespace {
}
ValidateStats.BlockAttachmentCount = BlockAttachments.size();
- std::vector<ChunkBlockDescription> VerifyBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockAttachments);
+ std::vector<ChunkBlockDescription> VerifyBlockDescriptions =
+ ParseChunkBlockDescriptionList(Storage.GetBlockMetadatas(BuildId, BlockAttachments));
if (VerifyBlockDescriptions.size() != BlockAttachments.size())
{
throw std::runtime_error(fmt::format("Uploaded blocks metadata could not all be found, {} blocks metadata is missing",
BlockAttachments.size() - VerifyBlockDescriptions.size()));
}
- WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
- WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
+ WorkerThreadPool& NetworkPool = SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst);
+ WorkerThreadPool& VerifyPool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst);
ParallellWork Work(AbortFlag);
const std::filesystem::path TempFolder = ".zen-tmp";
@@ -1881,7 +1899,7 @@ namespace {
void GenerateBuildBlocks(const std::filesystem::path& Path,
const ChunkedFolderContent& Content,
const ChunkedContentLookup& Lookup,
- BuildStorage& Storage,
+ StorageInstance& Storage,
const Oid& BuildId,
const std::vector<std::vector<uint32_t>>& NewBlockChunks,
GeneratedBlocks& OutBlocks,
@@ -1904,9 +1922,8 @@ namespace {
RwLock Lock;
- WorkerThreadPool& GenerateBlobsPool =
- GetMediumWorkerPool(EWorkloadType::Burst); // GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();//
- WorkerThreadPool& UploadBlocksPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();//
+ WorkerThreadPool& GenerateBlobsPool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst);
+ WorkerThreadPool& UploadBlocksPool = SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst);
FilteredRate FilteredGeneratedBytesPerSecond;
FilteredRate FilteredUploadedBytesPerSecond;
@@ -2005,21 +2022,35 @@ namespace {
const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash;
const uint64_t CompressedBlockSize = Payload.GetCompressedSize();
- Storage.PutBuildBlob(BuildId,
- BlockHash,
- ZenContentType::kCompressedBinary,
- std::move(Payload).GetCompressed());
+ if (Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBuildBlob(BuildId,
+ BlockHash,
+ ZenContentType::kCompressedBinary,
+ Payload.GetCompressed());
+ }
+
+ Storage.BuildStorage->PutBuildBlob(BuildId,
+ BlockHash,
+ ZenContentType::kCompressedBinary,
+ std::move(Payload).GetCompressed());
UploadStats.BlocksBytes += CompressedBlockSize;
+
ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks",
- OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
+ BlockHash,
NiceBytes(CompressedBlockSize),
OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
- Storage.PutBlockMetadata(BuildId,
- OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
- BlockMetaData);
+ if (Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBlobMetadatas(BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
+ }
+
+ Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData);
ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})",
- OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
+ BlockHash,
NiceBytes(BlockMetaData.GetSize()));
OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
@@ -2074,7 +2105,7 @@ namespace {
}
}
- void UploadPartBlobs(BuildStorage& Storage,
+ void UploadPartBlobs(StorageInstance& Storage,
const Oid& BuildId,
const std::filesystem::path& Path,
const ChunkedFolderContent& Content,
@@ -2092,8 +2123,8 @@ namespace {
{
ProgressBar ProgressBar(UsePlainProgress);
- WorkerThreadPool& ReadChunkPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
- WorkerThreadPool& UploadChunkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
+ WorkerThreadPool& ReadChunkPool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst);
+ WorkerThreadPool& UploadChunkPool = SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst);
FilteredRate FilteredGenerateBlockBytesPerSecond;
FilteredRate FilteredCompressedBytesPerSecond;
@@ -2177,18 +2208,26 @@ namespace {
const CbObject BlockMetaData =
BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]);
- Storage.PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload);
+ if (Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload);
+ }
+ Storage.BuildStorage->PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload);
ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks",
- NewBlocks.BlockDescriptions[BlockIndex].BlockHash,
+ BlockHash,
NiceBytes(PayloadSize),
NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
UploadedBlockSize += PayloadSize;
UploadStats.BlocksBytes += PayloadSize;
- Storage.PutBlockMetadata(BuildId, BlockHash, BlockMetaData);
- ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})",
- NewBlocks.BlockDescriptions[BlockIndex].BlockHash,
- NiceBytes(BlockMetaData.GetSize()));
+ if (Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBlobMetadatas(BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
+ }
+ Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData);
+ ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", BlockHash, NiceBytes(BlockMetaData.GetSize()));
NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
@@ -2214,12 +2253,17 @@ namespace {
ZEN_TRACE_CPU("AsyncUploadLooseChunk");
const uint64_t PayloadSize = Payload.GetSize();
- ;
+
+ if (Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
+ }
+
if (PayloadSize >= LargeAttachmentSize)
{
ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart");
UploadStats.MultipartAttachmentCount++;
- std::vector<std::function<void()>> MultipartWork = Storage.PutLargeBuildBlob(
+ std::vector<std::function<void()>> MultipartWork = Storage.BuildStorage->PutLargeBuildBlob(
BuildId,
RawHash,
ZenContentType::kCompressedBinary,
@@ -2232,7 +2276,7 @@ namespace {
PartPayload.SetContentType(ZenContentType::kBinary);
return PartPayload;
},
- [&, RawSize](uint64_t SentBytes, bool IsComplete) {
+ [&, Payload, RawSize](uint64_t SentBytes, bool IsComplete) {
UploadStats.ChunksBytes += SentBytes;
UploadedCompressedChunkSize += SentBytes;
if (IsComplete)
@@ -2264,7 +2308,7 @@ namespace {
else
{
ZEN_TRACE_CPU("AsyncUploadLooseChunk_Singlepart");
- Storage.PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
+ Storage.BuildStorage->PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
ZEN_CONSOLE_VERBOSE("Uploaded chunk {} ({})", RawHash, NiceBytes(PayloadSize));
UploadStats.ChunksBytes += Payload.GetSize();
UploadStats.ChunkCount++;
@@ -2303,7 +2347,7 @@ namespace {
if (!AbortFlag)
{
Work.ScheduleWork(
- ReadChunkPool, // GetSyncWorkerPool()
+ SingleThreaded ? GetSyncWorkerPool() : ReadChunkPool,
[&, BlockIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -2362,7 +2406,7 @@ namespace {
{
const uint32_t ChunkIndex = LooseChunkIndexes[CompressLooseChunkOrderIndex];
Work.ScheduleWork(
- ReadChunkPool, // GetSyncWorkerPool(),// ReadChunkPool,
+ SingleThreaded ? GetSyncWorkerPool() : ReadChunkPool,
[&, ChunkIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -2598,7 +2642,7 @@ namespace {
return FilteredReuseBlockIndexes;
};
- void UploadFolder(BuildStorage& Storage,
+ void UploadFolder(StorageInstance& Storage,
const Oid& BuildId,
const Oid& BuildPartId,
const std::string_view BuildPartName,
@@ -2654,7 +2698,7 @@ namespace {
ZEN_TRACE_CPU("CreateBuild");
Stopwatch PutBuildTimer;
- CbObject PutBuildResult = Storage.PutBuild(BuildId, MetaData);
+ CbObject PutBuildResult = Storage.BuildStorage->PutBuild(BuildId, MetaData);
Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs();
Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize);
Result.PayloadSize = MetaData.GetSize();
@@ -2663,7 +2707,7 @@ namespace {
{
ZEN_TRACE_CPU("PutBuild");
Stopwatch GetBuildTimer;
- CbObject Build = Storage.GetBuild(BuildId);
+ CbObject Build = Storage.BuildStorage->GetBuild(BuildId);
Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs();
Result.PayloadSize = Build.GetSize();
if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0)
@@ -2681,7 +2725,7 @@ namespace {
{
ZEN_TRACE_CPU("FindBlocks");
Stopwatch KnownBlocksTimer;
- Result.KnownBlocks = Storage.FindBlocks(BuildId);
+ Result.KnownBlocks = ParseChunkBlockDescriptionList(Storage.BuildStorage->FindBlocks(BuildId));
FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs();
FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size();
Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs();
@@ -2796,10 +2840,10 @@ namespace {
}
return true;
},
- GetMediumWorkerPool(EWorkloadType::Burst),
+ SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst),
UsePlainProgress ? 5000 : 200,
[&](bool, std::ptrdiff_t) {
- ZEN_DEBUG("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path);
+ ZEN_CONSOLE_VERBOSE("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path);
},
AbortFlag);
}
@@ -2855,7 +2899,7 @@ namespace {
FilteredBytesHashed.Start();
LocalContent = ChunkFolderContent(
ChunkingStats,
- GetMediumWorkerPool(EWorkloadType::Burst),
+ SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst),
Path,
Content,
*ChunkController,
@@ -2985,7 +3029,7 @@ namespace {
(FindBlocksStats.AcceptedByteCount + FindBlocksStats.AcceptedReduntantByteCount)
: 0.0;
ZEN_CONSOLE(
- "Found {} chunks in {} ({}) blocks eligeble for reuse in {}\n"
+ "Found {} chunks in {} ({}) blocks eligible for reuse in {}\n"
" Reusing {} ({}) matching chunks in {} blocks ({:.1f}%)\n"
" Accepting {} ({}) redundant chunks ({:.1f}%)\n"
" Rejected {} ({}) chunks in {} blocks\n"
@@ -3204,7 +3248,8 @@ namespace {
}
Stopwatch PutBuildPartResultTimer;
- std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult = Storage.PutBuildPart(BuildId, BuildPartId, BuildPartName, PartManifest);
+ std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult =
+ Storage.BuildStorage->PutBuildPart(BuildId, BuildPartId, BuildPartName, PartManifest);
ZEN_CONSOLE("PutBuildPart took {}, payload size {}. {} attachments are needed.",
NiceTimeSpanMs(PutBuildPartResultTimer.GetElapsedTimeMs()),
NiceBytes(PartManifest.GetSize()),
@@ -3289,7 +3334,7 @@ namespace {
while (!AbortFlag)
{
Stopwatch FinalizeBuildPartTimer;
- std::vector<IoHash> Needs = Storage.FinalizeBuildPart(BuildId, BuildPartId, PartHash);
+ std::vector<IoHash> Needs = Storage.BuildStorage->FinalizeBuildPart(BuildId, BuildPartId, PartHash);
ZEN_CONSOLE("FinalizeBuildPart took {}. {} attachments are missing.",
NiceTimeSpanMs(FinalizeBuildPartTimer.GetElapsedTimeMs()),
Needs.size());
@@ -3304,7 +3349,7 @@ namespace {
if (CreateBuild && !AbortFlag)
{
Stopwatch FinalizeBuildTimer;
- Storage.FinalizeBuild(BuildId);
+ Storage.BuildStorage->FinalizeBuild(BuildId);
ZEN_CONSOLE("FinalizeBuild took {}", NiceTimeSpanMs(FinalizeBuildTimer.GetElapsedTimeMs()));
}
@@ -3321,7 +3366,13 @@ namespace {
{
const CbObject BlockMetaData =
BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]);
- Storage.PutBlockMetadata(BuildId, BlockHash, BlockMetaData);
+ if (Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBlobMetadatas(BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
+ }
+ Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData);
UploadStats.BlocksBytes += BlockMetaData.GetSize();
NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
UploadBlockMetadataCount++;
@@ -3340,7 +3391,7 @@ namespace {
DownloadStatistics ValidateDownloadStats;
if (PostUploadVerify && !AbortFlag)
{
- ValidateBuildPart(Storage, BuildId, BuildPartId, BuildPartName, ValidateStats, ValidateDownloadStats);
+ ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, BuildPartName, ValidateStats, ValidateDownloadStats);
}
ZEN_CONSOLE_VERBOSE(
@@ -3570,7 +3621,7 @@ namespace {
ValidateInfo);
- Storage.PutBuildPartStats(
+ Storage.BuildStorage->PutBuildPartStats(
BuildId,
BuildPartId,
{{"totalSize", double(LocalFolderScanStats.FoundFileByteCount.load())},
@@ -3597,7 +3648,7 @@ namespace {
ProgressBar ProgressBar(UsePlainProgress);
- WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
+ WorkerThreadPool& VerifyPool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst);
ParallellWork Work(AbortFlag);
@@ -3873,6 +3924,22 @@ namespace {
return ChunkTargetPtrs;
};
+ uint64_t GetChunkWriteCount(std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ const ChunkedContentLookup& Lookup,
+ uint32_t ChunkIndex)
+ {
+ uint64_t WriteCount = 0;
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkSources = GetChunkSequenceLocations(Lookup, ChunkIndex);
+ for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources)
+ {
+ if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0)
+ {
+ WriteCount++;
+ }
+ }
+ return WriteCount;
+ };
+
void FinalizeChunkSequence(const std::filesystem::path& TargetFolder, const IoHash& SequenceRawHash)
{
ZEN_TRACE_CPU("FinalizeChunkSequence");
@@ -3892,8 +3959,39 @@ namespace {
}
}
+ void VerifySequence(const std::filesystem::path& TargetFolder,
+ const ChunkedFolderContent& RemoteContent,
+ const ChunkedContentLookup& Lookup,
+ uint32_t RemoteSequenceIndex)
+ {
+ ZEN_TRACE_CPU("VerifySequence");
+ const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ {
+ ZEN_TRACE_CPU("HashSequence");
+ const std::uint32_t RemotePathIndex = Lookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex];
+ const uint64_t ExpectedSize = RemoteContent.RawSizes[RemotePathIndex];
+ IoBuffer VerifyBuffer = IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash));
+ const uint64_t VerifySize = VerifyBuffer.GetSize();
+ if (VerifySize != ExpectedSize)
+ {
+ throw std::runtime_error(fmt::format("Written chunk sequence {} size {} does not match expected size {}",
+ SequenceRawHash,
+ VerifySize,
+ ExpectedSize));
+ }
+ ZEN_TRACE_CPU("HashSequence");
+ const IoHash VerifyChunkHash = IoHash::HashBuffer(std::move(VerifyBuffer));
+ if (VerifyChunkHash != SequenceRawHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash));
+ }
+ }
+ }
+
void VerifyAndCompleteChunkSequencesAsync(const std::filesystem::path& TargetFolder,
const ChunkedFolderContent& RemoteContent,
+ const ChunkedContentLookup& Lookup,
std::span<const uint32_t> RemoteSequenceIndexes,
ParallellWork& Work,
WorkerThreadPool& VerifyPool)
@@ -3908,40 +4006,24 @@ namespace {
const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset];
Work.ScheduleWork(
VerifyPool,
- [&RemoteContent, TargetFolder, RemoteSequenceIndex](std::atomic<bool>&) {
+ [&RemoteContent, &Lookup, TargetFolder, RemoteSequenceIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
ZEN_TRACE_CPU("VerifyAndCompleteChunkSequenceAsync");
- const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ VerifySequence(TargetFolder, RemoteContent, Lookup, RemoteSequenceIndex);
+ if (!AbortFlag)
{
- ZEN_TRACE_CPU("HashSequence");
- const IoHash VerifyChunkHash = IoHash::HashBuffer(
- IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
- if (VerifyChunkHash != SequenceRawHash)
- {
- throw std::runtime_error(fmt::format("Written chunk sequence {} hash does not match expected hash {}",
- VerifyChunkHash,
- SequenceRawHash));
- }
+ const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ FinalizeChunkSequence(TargetFolder, SequenceRawHash);
}
- FinalizeChunkSequence(TargetFolder, SequenceRawHash);
}
},
Work.DefaultErrorFunction());
}
const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0];
+ VerifySequence(TargetFolder, RemoteContent, Lookup, RemoteSequenceIndex);
const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- {
- ZEN_TRACE_CPU("HashSequence");
- const IoHash VerifyChunkHash =
- IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
- if (VerifyChunkHash != SequenceRawHash)
- {
- throw std::runtime_error(
- fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash));
- }
- }
FinalizeChunkSequence(TargetFolder, SequenceRawHash);
}
@@ -3985,8 +4067,7 @@ namespace {
const BlockWriteOps& Ops,
ParallellWork& Work,
WorkerThreadPool& VerifyPool,
- DiskStatistics& DiskStats,
- WriteChunkStatistics& WriteChunkStats)
+ DiskStatistics& DiskStats)
{
ZEN_TRACE_CPU("WriteBlockChunkOps");
{
@@ -4017,12 +4098,6 @@ namespace {
FileOffset,
RemoteContent.RawSizes[PathIndex]);
}
- WriteChunkStats.ChunkCountWritten += gsl::narrow<uint32_t>(Ops.ChunkBuffers.size());
- WriteChunkStats.ChunkBytesWritten +=
- std::accumulate(Ops.ChunkBuffers.begin(),
- Ops.ChunkBuffers.end(),
- uint64_t(0),
- [](uint64_t Current, const CompositeBuffer& Buffer) -> uint64_t { return Current + Buffer.GetSize(); });
}
if (!AbortFlag)
{
@@ -4036,7 +4111,7 @@ namespace {
CompletedChunkSequences.push_back(RemoteSequenceIndex);
}
}
- VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, CompletedChunkSequences, Work, VerifyPool);
+ VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, Lookup, CompletedChunkSequences, Work, VerifyPool);
}
}
@@ -4162,8 +4237,7 @@ namespace {
CompositeBuffer&& BlockBuffer,
const ChunkedContentLookup& Lookup,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- DiskStatistics& DiskStats,
- WriteChunkStatistics& WriteChunkStats)
+ DiskStatistics& DiskStats)
{
ZEN_TRACE_CPU("WriteBlockToDisk");
@@ -4197,8 +4271,7 @@ namespace {
Ops,
Work,
VerifyPool,
- DiskStats,
- WriteChunkStats);
+ DiskStats);
return true;
}
return false;
@@ -4222,8 +4295,7 @@ namespace {
Ops,
Work,
VerifyPool,
- DiskStats,
- WriteChunkStats);
+ DiskStats);
return true;
}
return false;
@@ -4240,8 +4312,7 @@ namespace {
uint32_t LastIncludedBlockChunkIndex,
const ChunkedContentLookup& Lookup,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- DiskStatistics& DiskStats,
- WriteChunkStatistics& WriteChunkStats)
+ DiskStatistics& DiskStats)
{
ZEN_TRACE_CPU("WritePartialBlockToDisk");
@@ -4267,8 +4338,7 @@ namespace {
Ops,
Work,
VerifyPool,
- DiskStats,
- WriteChunkStats);
+ DiskStats);
return true;
}
else
@@ -4355,8 +4425,7 @@ namespace {
void StreamDecompress(const std::filesystem::path& CacheFolderPath,
const IoHash& SequenceRawHash,
CompositeBuffer&& CompressedPart,
- DiskStatistics& DiskStats,
- WriteChunkStatistics& WriteChunkStats)
+ DiskStatistics& DiskStats)
{
ZEN_TRACE_CPU("StreamDecompress");
const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash);
@@ -4390,7 +4459,6 @@ namespace {
DiskStats.ReadByteCount += SourceSize;
if (!AbortFlag)
{
- WriteChunkStats.ChunkBytesWritten += RangeBuffer.GetSize();
DecompressedTemp.Write(RangeBuffer, Offset);
for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
{
@@ -4424,7 +4492,7 @@ namespace {
throw std::runtime_error(
fmt::format("Failed moving temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message()));
}
- WriteChunkStats.ChunkCountWritten++;
+ // WriteChunkStats.ChunkCountWritten++;
}
bool WriteCompressedChunk(const std::filesystem::path& TargetFolder,
@@ -4433,8 +4501,7 @@ namespace {
const IoHash& ChunkHash,
const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
IoBuffer&& CompressedPart,
- DiskStatistics& DiskStats,
- WriteChunkStatistics& WriteChunkStats)
+ DiskStatistics& DiskStats)
{
auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end());
@@ -4444,7 +4511,7 @@ namespace {
{
const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex;
const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex];
- StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), DiskStats, WriteChunkStats);
+ StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), DiskStats);
}
else
{
@@ -4459,8 +4526,6 @@ namespace {
ChunkTargetPtrs,
CompositeBuffer(std::move(Chunk)),
OpenFileCache);
- WriteChunkStats.ChunkCountWritten++;
- WriteChunkStats.ChunkBytesWritten += ChunkRawSize;
return true;
}
}
@@ -4479,8 +4544,7 @@ namespace {
std::atomic<uint64_t>& WritePartsComplete,
const uint64_t TotalPartWriteCount,
FilteredRate& FilteredWrittenBytesPerSecond,
- DiskStatistics& DiskStats,
- WriteChunkStatistics& WriteChunkStats)
+ DiskStatistics& DiskStats)
{
ZEN_TRACE_CPU("AsyncWriteDownloadedChunk");
@@ -4527,7 +4591,7 @@ namespace {
}
Work.ScheduleWork(
- WritePool, // GetSyncWorkerPool(),//
+ WritePool,
[&,
SequenceIndexChunksLeftToWriteCounters,
CompressedChunkPath,
@@ -4565,8 +4629,7 @@ namespace {
ChunkHash,
ChunkTargetPtrs,
std::move(CompressedPart),
- DiskStats,
- WriteChunkStats);
+ DiskStats);
if (!AbortFlag)
{
WritePartsComplete++;
@@ -4581,7 +4644,12 @@ namespace {
CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
if (NeedHashVerify)
{
- VerifyAndCompleteChunkSequencesAsync(TargetFolder, RemoteContent, CompletedSequences, Work, WritePool);
+ VerifyAndCompleteChunkSequencesAsync(TargetFolder,
+ RemoteContent,
+ RemoteLookup,
+ CompletedSequences,
+ Work,
+ WritePool);
}
else
{
@@ -4593,7 +4661,7 @@ namespace {
Work.DefaultErrorFunction());
};
- void UpdateFolder(BuildStorage& Storage,
+ void UpdateFolder(StorageInstance& Storage,
const Oid& BuildId,
const std::filesystem::path& Path,
const std::uint64_t LargeAttachmentSize,
@@ -4667,8 +4735,8 @@ namespace {
if (SequenceSize == CacheDirContent.FileSizes[Index])
{
CachedSequenceHashesFound.insert({FileHash, SequenceIndex});
- CacheMappingStats.CacheSequenceHashesCount += SequenceSize;
- CacheMappingStats.CacheSequenceHashesByteCount++;
+ CacheMappingStats.CacheSequenceHashesCount++;
+ CacheMappingStats.CacheSequenceHashesByteCount += SequenceSize;
continue;
}
}
@@ -4869,21 +4937,17 @@ namespace {
NiceBytes(CacheMappingStats.LocalChunkMatchingRemoteByteCount));
}
- uint32_t ChunkCountToWrite = 0;
+ uint64_t BytesToWrite = 0;
+
for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++)
{
- if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
+ uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex);
+ if (ChunkWriteCount > 0)
{
- ChunkCountToWrite++;
- }
- else
- {
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
- GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex);
- if (!ChunkTargetPtrs.empty())
+ BytesToWrite += RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] * ChunkWriteCount;
+ if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
{
RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex] = true;
- ChunkCountToWrite++;
}
}
}
@@ -4900,8 +4964,8 @@ namespace {
FilteredRate FilteredDownloadedBytesPerSecond;
FilteredRate FilteredWrittenBytesPerSecond;
- WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
- WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
+ WorkerThreadPool& NetworkPool = SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst);
+ WorkerThreadPool& WritePool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst);
ProgressBar WriteProgressBar(UsePlainProgress);
ParallellWork Work(AbortFlag);
@@ -4922,7 +4986,7 @@ namespace {
const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second;
if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
{
- ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash);
+ ZEN_CONSOLE_VERBOSE("Skipping chunk {} due to cache reuse", ChunkHash);
continue;
}
bool NeedsCopy = true;
@@ -4933,7 +4997,7 @@ namespace {
if (ChunkTargetPtrs.empty())
{
- ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash);
+ ZEN_CONSOLE_VERBOSE("Skipping chunk {} due to cache reuse", ChunkHash);
}
else
{
@@ -5025,6 +5089,10 @@ namespace {
uint32_t CurrentOffset =
gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
+ const uint64_t TotalBlockSize = std::accumulate(BlockDescription.ChunkCompressedLengths.begin(),
+ BlockDescription.ChunkCompressedLengths.end(),
+ std::uint64_t(CurrentOffset));
+
BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex};
while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() &&
ChunkBlockIndex < BlockDescription.ChunkRawHashes.size())
@@ -5064,6 +5132,7 @@ namespace {
}
ZEN_ASSERT(!BlockRanges.empty());
+
std::vector<BlockRangeDescriptor> CollapsedBlockRanges;
auto It = BlockRanges.begin();
CollapsedBlockRanges.push_back(*It++);
@@ -5085,10 +5154,87 @@ namespace {
++It;
}
- TotalRequestCount += CollapsedBlockRanges.size();
- TotalPartWriteCount += CollapsedBlockRanges.size();
+ const std::uint64_t WantedSize = std::accumulate(
+ CollapsedBlockRanges.begin(),
+ CollapsedBlockRanges.end(),
+ uint64_t(0),
+ [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; });
+ ZEN_ASSERT(WantedSize <= TotalBlockSize);
+ if (WantedSize > ((TotalBlockSize * 95) / 100))
+ {
+ ZEN_CONSOLE_VERBOSE("Using more than 95% ({}) of block {} ({}), requesting full block",
+ NiceBytes(WantedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize));
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+
+ FullBlockWorks.push_back(BlockIndex);
+ }
+ else if ((WantedSize > ((TotalBlockSize * 9) / 10)) && CollapsedBlockRanges.size() > 1)
+ {
+ ZEN_CONSOLE_VERBOSE("Using more than 90% ({}) of block {} ({}) using {} requests, requesting full block",
+ NiceBytes(WantedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ CollapsedBlockRanges.size());
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+
+ FullBlockWorks.push_back(BlockIndex);
+ }
+ else if ((WantedSize > ((TotalBlockSize * 8) / 10)) && (CollapsedBlockRanges.size() > 16))
+ {
+ ZEN_CONSOLE_VERBOSE("Using more than 80% ({}) of block {} ({}) using {} requests, requesting full block",
+ NiceBytes(WantedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ CollapsedBlockRanges.size());
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+
+ FullBlockWorks.push_back(BlockIndex);
+ }
+ else if ((WantedSize > ((TotalBlockSize * 7) / 10)) && (CollapsedBlockRanges.size() > 48))
+ {
+ ZEN_CONSOLE_VERBOSE("Using more than 70% ({}) of block {} ({}) using {} requests, requesting full block",
+ NiceBytes(WantedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ CollapsedBlockRanges.size());
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+
+ FullBlockWorks.push_back(BlockIndex);
+ }
+ else if ((WantedSize > ((TotalBlockSize * 6) / 10)) && (CollapsedBlockRanges.size() > 64))
+ {
+ ZEN_CONSOLE_VERBOSE("Using more than 60% ({}) of block {} ({}) using {} requests, requesting full block",
+ NiceBytes(WantedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ CollapsedBlockRanges.size());
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+
+ FullBlockWorks.push_back(BlockIndex);
+ }
+ else
+ {
+ if (WantedSize > ((TotalBlockSize * 5) / 10))
+ {
+ ZEN_CONSOLE_VERBOSE("Using {}% ({}) of block {} ({}) using {} requests, requesting partial block",
+ (WantedSize * 100) / TotalBlockSize,
+ NiceBytes(WantedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ CollapsedBlockRanges.size());
+ }
+ TotalRequestCount += CollapsedBlockRanges.size();
+ TotalPartWriteCount += CollapsedBlockRanges.size();
- BlockRangeWorks.insert(BlockRangeWorks.end(), CollapsedBlockRanges.begin(), CollapsedBlockRanges.end());
+ BlockRangeWorks.insert(BlockRangeWorks.end(), CollapsedBlockRanges.begin(), CollapsedBlockRanges.end());
+ }
}
else
{
@@ -5101,10 +5247,69 @@ namespace {
}
else
{
- ZEN_DEBUG("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash);
+ ZEN_CONSOLE_VERBOSE("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash);
}
}
+ struct BlobsExistsResult
+ {
+ tsl::robin_set<IoHash> ExistingBlobs;
+ uint64_t ElapsedTimeMs = 0;
+ };
+
+ BlobsExistsResult ExistsResult;
+
+ if (Storage.BuildCacheStorage)
+ {
+ ZEN_TRACE_CPU("BlobCacheExistCheck");
+ Stopwatch Timer;
+
+ tsl::robin_set<IoHash> BlobHashesSet;
+
+ BlobHashesSet.reserve(LooseChunkHashWorks.size() + FullBlockWorks.size());
+ for (LooseChunkHashWorkData& LooseChunkHashWork : LooseChunkHashWorks)
+ {
+ BlobHashesSet.insert(RemoteContent.ChunkedContent.ChunkHashes[LooseChunkHashWork.RemoteChunkIndex]);
+ }
+ for (const BlockRangeDescriptor& BlockRange : BlockRangeWorks)
+ {
+ const uint32_t BlockIndex = BlockRange.BlockIndex;
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+ BlobHashesSet.insert(BlockDescription.BlockHash);
+ }
+ for (uint32_t BlockIndex : FullBlockWorks)
+ {
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+ BlobHashesSet.insert(BlockDescription.BlockHash);
+ }
+
+ if (!BlobHashesSet.empty())
+ {
+ const std::vector<IoHash> BlobHashes(BlobHashesSet.begin(), BlobHashesSet.end());
+ const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult =
+ Storage.BuildCacheStorage->BlobsExists(BuildId, BlobHashes);
+
+ if (CacheExistsResult.size() == BlobHashes.size())
+ {
+ ExistsResult.ExistingBlobs.reserve(CacheExistsResult.size());
+ for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++)
+ {
+ if (CacheExistsResult[BlobIndex].HasBody)
+ {
+ ExistsResult.ExistingBlobs.insert(BlobHashes[BlobIndex]);
+ }
+ }
+ }
+ ExistsResult.ElapsedTimeMs = Timer.GetElapsedTimeMs();
+ if (!ExistsResult.ExistingBlobs.empty())
+ {
+ ZEN_CONSOLE("Found {} out of {} needed blobs in remote cache in {}",
+ ExistsResult.ExistingBlobs.size(),
+ BlobHashes.size(),
+ NiceTimeSpanMs(ExistsResult.ElapsedTimeMs));
+ }
+ }
+ }
for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++)
{
if (AbortFlag)
@@ -5119,7 +5324,7 @@ namespace {
const uint32_t RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex;
Work.ScheduleWork(
- WritePool, // NetworkPool, // GetSyncWorkerPool(),//
+ WritePool,
[&, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) mutable {
if (!AbortFlag)
{
@@ -5152,151 +5357,202 @@ namespace {
}
}
}
- if (!ExistingCompressedChunkPath.empty())
+ if (!AbortFlag)
+
{
- Work.ScheduleWork(
- WritePool, // WritePool, GetSyncWorkerPool()
- [&Path,
- &RemoteContent,
- &RemoteLookup,
- &CacheFolderPath,
- &SequenceIndexChunksLeftToWriteCounters,
- &Work,
- &WritePool,
- &DiskStats,
- &WriteChunkStats,
- &WritePartsComplete,
- &TotalPartWriteCount,
- &FilteredWrittenBytesPerSecond,
- RemoteChunkIndex,
- ChunkTargetPtrs,
- CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded");
+ if (!ExistingCompressedChunkPath.empty())
+ {
+ Work.ScheduleWork(
+ WritePool,
+ [&Path,
+ &RemoteContent,
+ &RemoteLookup,
+ &CacheFolderPath,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &Work,
+ &WritePool,
+ &DiskStats,
+ &WriteChunkStats,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ RemoteChunkIndex,
+ ChunkTargetPtrs,
+ CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded");
- FilteredWrittenBytesPerSecond.Start();
+ FilteredWrittenBytesPerSecond.Start();
- const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
- IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
- if (!CompressedPart)
- {
- throw std::runtime_error(fmt::format("Could not open dowloaded compressed chunk {} from {}",
- ChunkHash,
- CompressedChunkPath));
- }
+ IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
+ if (!CompressedPart)
+ {
+ throw std::runtime_error(
+ fmt::format("Could not open dowloaded compressed chunk {} from {}",
+ ChunkHash,
+ CompressedChunkPath));
+ }
- std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName;
- bool NeedHashVerify = WriteCompressedChunk(TargetFolder,
- RemoteContent,
- RemoteLookup,
- ChunkHash,
- ChunkTargetPtrs,
- std::move(CompressedPart),
- DiskStats,
- WriteChunkStats);
- WriteChunkStats.ChunkCountWritten++;
- WriteChunkStats.ChunkBytesWritten +=
- RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex];
- WritePartsComplete++;
+ std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName;
+ bool NeedHashVerify = WriteCompressedChunk(TargetFolder,
+ RemoteContent,
+ RemoteLookup,
+ ChunkHash,
+ ChunkTargetPtrs,
+ std::move(CompressedPart),
+ DiskStats);
+ WritePartsComplete++;
- if (!AbortFlag)
- {
- if (WritePartsComplete == TotalPartWriteCount)
+ if (!AbortFlag)
{
- FilteredWrittenBytesPerSecond.Stop();
- }
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
- std::filesystem::remove(CompressedChunkPath);
+ std::filesystem::remove(CompressedChunkPath);
- std::vector<uint32_t> CompletedSequences =
- CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
- if (NeedHashVerify)
+ std::vector<uint32_t> CompletedSequences =
+ CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
+ if (NeedHashVerify)
+ {
+ VerifyAndCompleteChunkSequencesAsync(TargetFolder,
+ RemoteContent,
+ RemoteLookup,
+ CompletedSequences,
+ Work,
+ WritePool);
+ }
+ else
+ {
+ FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences);
+ }
+ }
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+ else
+ {
+ Work.ScheduleWork(
+ NetworkPool,
+ [&Path,
+ &Storage,
+ BuildId,
+ &RemoteContent,
+ &RemoteLookup,
+ &ExistsResult,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &Work,
+ &WritePool,
+ &NetworkPool,
+ &DiskStats,
+ &WriteChunkStats,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ TotalRequestCount,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredWrittenBytesPerSecond,
+ LargeAttachmentSize,
+ PreferredMultipartChunkSize,
+ RemoteChunkIndex,
+ ChunkTargetPtrs,
+ &DownloadStats](std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ FilteredDownloadedBytesPerSecond.Start();
+ const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize)
{
- VerifyAndCompleteChunkSequencesAsync(TargetFolder,
- RemoteContent,
- CompletedSequences,
- Work,
- WritePool);
+ ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk");
+ DownloadLargeBlob(*Storage.BuildStorage,
+ Path / ZenTempDownloadFolderName,
+ BuildId,
+ ChunkHash,
+ PreferredMultipartChunkSize,
+ Work,
+ NetworkPool,
+ DownloadStats,
+ [&, RemoteChunkIndex, ChunkTargetPtrs](IoBuffer&& Payload) mutable {
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ if (!AbortFlag)
+ {
+ AsyncWriteDownloadedChunk(
+ Path,
+ RemoteContent,
+ RemoteLookup,
+ RemoteChunkIndex,
+ std::move(ChunkTargetPtrs),
+ Work,
+ WritePool,
+ std::move(Payload),
+ SequenceIndexChunksLeftToWriteCounters,
+ WritePartsComplete,
+ TotalPartWriteCount,
+ FilteredWrittenBytesPerSecond,
+ DiskStats);
+ }
+ });
}
else
{
- FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences);
+ ZEN_TRACE_CPU("UpdateFolder_GetChunk");
+ IoBuffer BuildBlob;
+ if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash))
+ {
+ BuildBlob = Storage.BuildCacheStorage->GetBuildBlob(BuildId, ChunkHash);
+ }
+ if (!BuildBlob)
+ {
+ BuildBlob = Storage.BuildStorage->GetBuildBlob(BuildId, ChunkHash);
+ if (BuildBlob && Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBuildBlob(
+ BuildId,
+ ChunkHash,
+ BuildBlob.GetContentType(),
+ CompositeBuffer(SharedBuffer(BuildBlob)));
+ }
+ }
+ if (!BuildBlob)
+ {
+ throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
+ }
+ if (!AbortFlag)
+ {
+ uint64_t BlobSize = BuildBlob.GetSize();
+ DownloadStats.DownloadedChunkCount++;
+ DownloadStats.DownloadedChunkByteCount += BlobSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ AsyncWriteDownloadedChunk(Path,
+ RemoteContent,
+ RemoteLookup,
+ RemoteChunkIndex,
+ std::move(ChunkTargetPtrs),
+ Work,
+ WritePool,
+ std::move(BuildBlob),
+ SequenceIndexChunksLeftToWriteCounters,
+ WritePartsComplete,
+ TotalPartWriteCount,
+ FilteredWrittenBytesPerSecond,
+ DiskStats);
+ }
}
}
- }
- },
- Work.DefaultErrorFunction());
- }
- else
- {
- FilteredDownloadedBytesPerSecond.Start();
- const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
- if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize)
- {
- ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk");
- DownloadLargeBlob(Storage,
- Path / ZenTempDownloadFolderName,
- BuildId,
- ChunkHash,
- PreferredMultipartChunkSize,
- Work,
- NetworkPool,
- DownloadStats,
- [&, RemoteChunkIndex, ChunkTargetPtrs](IoBuffer&& Payload) mutable {
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- AsyncWriteDownloadedChunk(Path,
- RemoteContent,
- RemoteLookup,
- RemoteChunkIndex,
- std::move(ChunkTargetPtrs),
- Work,
- WritePool,
- std::move(Payload),
- SequenceIndexChunksLeftToWriteCounters,
- WritePartsComplete,
- TotalPartWriteCount,
- FilteredWrittenBytesPerSecond,
- DiskStats,
- WriteChunkStats);
- });
- }
- else
- {
- ZEN_TRACE_CPU("UpdateFolder_GetChunk");
-
- IoBuffer BuildBlob = Storage.GetBuildBlob(BuildId, ChunkHash);
- if (!BuildBlob)
- {
- throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
- }
- uint64_t BlobSize = BuildBlob.GetSize();
- DownloadStats.DownloadedChunkCount++;
- DownloadStats.DownloadedChunkByteCount += BlobSize;
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- AsyncWriteDownloadedChunk(Path,
- RemoteContent,
- RemoteLookup,
- RemoteChunkIndex,
- std::move(ChunkTargetPtrs),
- Work,
- WritePool,
- std::move(BuildBlob),
- SequenceIndexChunksLeftToWriteCounters,
- WritePartsComplete,
- TotalPartWriteCount,
- FilteredWrittenBytesPerSecond,
- DiskStats,
- WriteChunkStats);
+ },
+ Work.DefaultErrorFunction());
}
}
}
@@ -5312,7 +5568,7 @@ namespace {
}
Work.ScheduleWork(
- WritePool, // GetSyncWorkerPool(),//
+ WritePool,
[&, CopyDataIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -5439,16 +5695,6 @@ namespace {
ChunkSource,
Op.Target->Offset,
RemoteContent.RawSizes[RemotePathIndex]);
- for (size_t WrittenOpIndex = WriteOpIndex; WrittenOpIndex < WriteOpIndex + WriteCount; WrittenOpIndex++)
- {
- const WriteOp& WrittenOp = WriteOps[WrittenOpIndex];
- if (ChunkIndexesWritten.insert(WrittenOp.ChunkIndex).second)
- {
- WriteChunkStats.ChunkCountWritten++;
- WriteChunkStats.ChunkBytesWritten +=
- RemoteContent.ChunkedContent.ChunkRawSizes[WrittenOp.ChunkIndex];
- }
- }
CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes?
@@ -5469,10 +5715,13 @@ namespace {
}
VerifyAndCompleteChunkSequencesAsync(CacheFolderPath,
RemoteContent,
+ RemoteLookup,
CompletedChunkSequences,
Work,
WritePool);
- ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]);
+ ZEN_CONSOLE_VERBOSE("Copied {} from {}",
+ NiceBytes(CacheLocalFileBytesRead),
+ LocalContent.Paths[LocalPathIndex]);
}
WritePartsComplete++;
if (WritePartsComplete == TotalPartWriteCount)
@@ -5492,7 +5741,7 @@ namespace {
}
Work.ScheduleWork(
- WritePool, // GetSyncWorkerPool(), // WritePool,
+ WritePool,
[&, BlockIndex](std::atomic<bool>&) mutable {
if (!AbortFlag)
{
@@ -5509,27 +5758,29 @@ namespace {
fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath));
}
- if (!WriteBlockToDisk(CacheFolderPath,
- RemoteContent,
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- WritePool,
- CompositeBuffer(std::move(BlockBuffer)),
- RemoteLookup,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- DiskStats,
- WriteChunkStats))
- {
- std::error_code DummyEc;
- std::filesystem::remove(BlockChunkPath, DummyEc);
- throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
- }
- WritePartsComplete++;
- std::filesystem::remove(BlockChunkPath);
- if (WritePartsComplete == TotalPartWriteCount)
+ if (!AbortFlag)
{
- FilteredWrittenBytesPerSecond.Stop();
+ if (!WriteBlockToDisk(CacheFolderPath,
+ RemoteContent,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
+ CompositeBuffer(std::move(BlockBuffer)),
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ DiskStats))
+ {
+ std::error_code DummyEc;
+ std::filesystem::remove(BlockChunkPath, DummyEc);
+ throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
+ }
+ WritePartsComplete++;
+ std::filesystem::remove(BlockChunkPath);
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
}
}
},
@@ -5546,7 +5797,7 @@ namespace {
ZEN_ASSERT(BlockRange.BlockIndex != (uint32_t)-1);
const uint32_t BlockIndex = BlockRange.BlockIndex;
Work.ScheduleWork(
- NetworkPool, // NetworkPool, // GetSyncWorkerPool()
+ NetworkPool,
[&, BlockIndex, BlockRange](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -5555,131 +5806,148 @@ namespace {
const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
FilteredDownloadedBytesPerSecond.Start();
- IoBuffer BlockBuffer =
- Storage.GetBuildBlob(BuildId, BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength);
+ IoBuffer BlockBuffer;
+ if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash))
+ {
+ BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId,
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
+ }
if (!BlockBuffer)
{
- throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
+ BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId,
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
}
- uint64_t BlockSize = BlockBuffer.GetSize();
- DownloadStats.DownloadedBlockCount++;
- DownloadStats.DownloadedBlockByteCount += BlockSize;
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ if (!BlockBuffer)
{
- FilteredDownloadedBytesPerSecond.Stop();
+ throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
}
-
- std::filesystem::path BlockChunkPath;
-
- // Check if the dowloaded block is file based and we can move it directly without rewriting it
+ if (!AbortFlag)
{
- IoBufferFileReference FileRef;
- if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
- (FileRef.FileChunkSize == BlockSize))
+ uint64_t BlockSize = BlockBuffer.GetSize();
+ DownloadStats.DownloadedBlockCount++;
+ DownloadStats.DownloadedBlockByteCount += BlockSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
- ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock");
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
- std::error_code Ec;
- std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (!Ec)
+ std::filesystem::path BlockChunkPath;
+
+ // Check if the dowloaded block is file based and we can move it directly without rewriting it
+ {
+ IoBufferFileReference FileRef;
+ if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
+ (FileRef.FileChunkSize == BlockSize))
{
- BlockBuffer.SetDeleteOnClose(false);
- BlockBuffer = {};
- BlockChunkPath = Path / ZenTempBlockFolderName /
- fmt::format("{}_{:x}_{:x}",
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeLength);
- std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec);
- if (Ec)
+ ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock");
+
+ std::error_code Ec;
+ std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
{
- BlockChunkPath = std::filesystem::path{};
+ BlockBuffer.SetDeleteOnClose(false);
+ BlockBuffer = {};
+ BlockChunkPath = Path / ZenTempBlockFolderName /
+ fmt::format("{}_{:x}_{:x}",
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
+ std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec);
+ if (Ec)
+ {
+ BlockChunkPath = std::filesystem::path{};
- // Re-open the temp file again
- BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
- BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
- BlockBuffer.SetDeleteOnClose(true);
+ // Re-open the temp file again
+ BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
+ BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
+ BlockBuffer.SetDeleteOnClose(true);
+ }
}
}
}
- }
- if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
- {
- ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock");
- // Could not be moved and rather large, lets store it on disk
- BlockChunkPath =
- Path / ZenTempBlockFolderName /
- fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength);
- TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
- BlockBuffer = {};
- }
+ if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock");
+ // Could not be moved and rather large, lets store it on disk
+ BlockChunkPath = Path / ZenTempBlockFolderName /
+ fmt::format("{}_{:x}_{:x}",
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
+ TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
+ BlockBuffer = {};
+ }
- if (!AbortFlag)
- {
- Work.ScheduleWork(
- WritePool, // WritePool, // GetSyncWorkerPool(),
- [&, BlockIndex, BlockRange, BlockChunkPath, BlockPartialBuffer = std::move(BlockBuffer)](
- std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_WritePartialBlock");
+ if (!AbortFlag)
+ {
+ Work.ScheduleWork(
+ WritePool,
+ [&, BlockIndex, BlockRange, BlockChunkPath, BlockPartialBuffer = std::move(BlockBuffer)](
+ std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WritePartialBlock");
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
- if (BlockChunkPath.empty())
- {
- ZEN_ASSERT(BlockPartialBuffer);
- }
- else
- {
- ZEN_ASSERT(!BlockPartialBuffer);
- BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
- if (!BlockPartialBuffer)
+ if (BlockChunkPath.empty())
{
- throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}",
- BlockDescription.BlockHash,
- BlockChunkPath));
+ ZEN_ASSERT(BlockPartialBuffer);
+ }
+ else
+ {
+ ZEN_ASSERT(!BlockPartialBuffer);
+ BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockPartialBuffer)
+ {
+ throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}",
+ BlockDescription.BlockHash,
+ BlockChunkPath));
+ }
}
- }
- FilteredWrittenBytesPerSecond.Start();
-
- if (!WritePartialBlockToDisk(
- CacheFolderPath,
- RemoteContent,
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- WritePool,
- CompositeBuffer(std::move(BlockPartialBuffer)),
- BlockRange.ChunkBlockIndexStart,
- BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
- RemoteLookup,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- DiskStats,
- WriteChunkStats))
- {
- std::error_code DummyEc;
- std::filesystem::remove(BlockChunkPath, DummyEc);
- throw std::runtime_error(
- fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
- }
+ FilteredWrittenBytesPerSecond.Start();
+
+ if (!WritePartialBlockToDisk(
+ CacheFolderPath,
+ RemoteContent,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
+ CompositeBuffer(std::move(BlockPartialBuffer)),
+ BlockRange.ChunkBlockIndexStart,
+ BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ DiskStats))
+ {
+ std::error_code DummyEc;
+ std::filesystem::remove(BlockChunkPath, DummyEc);
+ throw std::runtime_error(
+ fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
+ }
- if (!BlockChunkPath.empty())
- {
- std::filesystem::remove(BlockChunkPath);
- }
+ if (!BlockChunkPath.empty())
+ {
+ std::filesystem::remove(BlockChunkPath);
+ }
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
}
- }
- },
- Work.DefaultErrorFunction());
+ },
+ Work.DefaultErrorFunction());
+ }
}
}
},
@@ -5693,7 +5961,7 @@ namespace {
break;
}
Work.ScheduleWork(
- NetworkPool, // GetSyncWorkerPool(), // NetworkPool,
+ NetworkPool,
[&, BlockIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -5702,133 +5970,152 @@ namespace {
const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
FilteredDownloadedBytesPerSecond.Start();
- IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescription.BlockHash);
+
+ IoBuffer BlockBuffer;
+ if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash))
+ {
+ BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash);
+ }
if (!BlockBuffer)
{
- throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
+ BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash);
+ if (BlockBuffer && Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBuildBlob(BuildId,
+ BlockDescription.BlockHash,
+ BlockBuffer.GetContentType(),
+ CompositeBuffer(SharedBuffer(BlockBuffer)));
+ }
}
- uint64_t BlockSize = BlockBuffer.GetSize();
- DownloadStats.DownloadedBlockCount++;
- DownloadStats.DownloadedBlockByteCount += BlockSize;
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ if (!BlockBuffer)
{
- FilteredDownloadedBytesPerSecond.Stop();
+ throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
}
+ if (!AbortFlag)
+ {
+ uint64_t BlockSize = BlockBuffer.GetSize();
+ DownloadStats.DownloadedBlockCount++;
+ DownloadStats.DownloadedBlockByteCount += BlockSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
- std::filesystem::path BlockChunkPath;
+ std::filesystem::path BlockChunkPath;
- // Check if the dowloaded block is file based and we can move it directly without rewriting it
- {
- IoBufferFileReference FileRef;
- if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
- (FileRef.FileChunkSize == BlockSize))
+ // Check if the dowloaded block is file based and we can move it directly without rewriting it
{
- ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock");
- std::error_code Ec;
- std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (!Ec)
+ IoBufferFileReference FileRef;
+ if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
+ (FileRef.FileChunkSize == BlockSize))
{
- BlockBuffer.SetDeleteOnClose(false);
- BlockBuffer = {};
- BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString();
- std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec);
- if (Ec)
+ ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock");
+ std::error_code Ec;
+ std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
{
- BlockChunkPath = std::filesystem::path{};
+ BlockBuffer.SetDeleteOnClose(false);
+ BlockBuffer = {};
+ BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString();
+ std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec);
+ if (Ec)
+ {
+ BlockChunkPath = std::filesystem::path{};
- // Re-open the temp file again
- BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
- BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
- BlockBuffer.SetDeleteOnClose(true);
+ // Re-open the temp file again
+ BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
+ BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
+ BlockBuffer.SetDeleteOnClose(true);
+ }
}
}
}
- }
- if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
- {
- ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock");
- // Could not be moved and rather large, lets store it on disk
- BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString();
- TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
- BlockBuffer = {};
- }
+ if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock");
+ // Could not be moved and rather large, lets store it on disk
+ BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString();
+ TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
+ BlockBuffer = {};
+ }
- if (!AbortFlag)
- {
- Work.ScheduleWork(
- WritePool, // WritePool, GetSyncWorkerPool()
- [&Work,
- &WritePool,
- &RemoteContent,
- &RemoteLookup,
- CacheFolderPath,
- &RemoteChunkIndexNeedsCopyFromSourceFlags,
- &SequenceIndexChunksLeftToWriteCounters,
- BlockIndex,
- &BlockDescriptions,
- &WriteChunkStats,
- &DiskStats,
- &WritePartsComplete,
- &TotalPartWriteCount,
- &FilteredWrittenBytesPerSecond,
- BlockChunkPath,
- BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock");
+ if (!AbortFlag)
+ {
+ Work.ScheduleWork(
+ WritePool,
+ [&Work,
+ &WritePool,
+ &RemoteContent,
+ &RemoteLookup,
+ CacheFolderPath,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ BlockIndex,
+ &BlockDescriptions,
+ &WriteChunkStats,
+ &DiskStats,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ BlockChunkPath,
+ BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock");
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
- if (BlockChunkPath.empty())
- {
- ZEN_ASSERT(BlockBuffer);
- }
- else
- {
- ZEN_ASSERT(!BlockBuffer);
- BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
- if (!BlockBuffer)
+ if (BlockChunkPath.empty())
{
- throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}",
- BlockDescription.BlockHash,
- BlockChunkPath));
+ ZEN_ASSERT(BlockBuffer);
+ }
+ else
+ {
+ ZEN_ASSERT(!BlockBuffer);
+ BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}",
+ BlockDescription.BlockHash,
+ BlockChunkPath));
+ }
}
- }
- FilteredWrittenBytesPerSecond.Start();
- if (!WriteBlockToDisk(CacheFolderPath,
- RemoteContent,
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- WritePool,
- CompositeBuffer(std::move(BlockBuffer)),
- RemoteLookup,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- DiskStats,
- WriteChunkStats))
- {
- std::error_code DummyEc;
- std::filesystem::remove(BlockChunkPath, DummyEc);
- throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
- }
+ FilteredWrittenBytesPerSecond.Start();
+ if (!WriteBlockToDisk(CacheFolderPath,
+ RemoteContent,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
+ CompositeBuffer(std::move(BlockBuffer)),
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ DiskStats))
+ {
+ std::error_code DummyEc;
+ std::filesystem::remove(BlockChunkPath, DummyEc);
+ throw std::runtime_error(
+ fmt::format("Block {} is malformed", BlockDescription.BlockHash));
+ }
- if (!BlockChunkPath.empty())
- {
- std::filesystem::remove(BlockChunkPath);
- }
+ if (!BlockChunkPath.empty())
+ {
+ std::filesystem::remove(BlockChunkPath);
+ }
- WritePartsComplete++;
+ WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
}
- }
- },
- Work.DefaultErrorFunction());
+ },
+ Work.DefaultErrorFunction());
+ }
}
}
},
@@ -5840,27 +6127,28 @@ namespace {
Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(IsAborted, PendingWork);
- ZEN_ASSERT(ChunkCountToWrite >= WriteChunkStats.ChunkCountWritten.load());
uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() +
DownloadStats.DownloadedBlockByteCount.load() +
+DownloadStats.DownloadedPartialBlockByteCount.load();
FilteredWrittenBytesPerSecond.Update(DiskStats.WriteByteCount.load());
FilteredDownloadedBytesPerSecond.Update(DownloadedBytes);
- std::string Details = fmt::format("{}/{} ({} {}bits/s) downloaded. {}/{} ({} {}B/s) written.",
+ std::string DownloadRateString =
+ (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ ? ""
+ : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8));
+ std::string Details = fmt::format("{}/{} ({}{}) downloaded. {}/{} ({}B/s) written.",
DownloadStats.RequestsCompleteCount.load(),
TotalRequestCount,
NiceBytes(DownloadedBytes),
- NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8),
- WriteChunkStats.ChunkCountWritten.load(),
- ChunkCountToWrite,
+ DownloadRateString,
NiceBytes(DiskStats.WriteByteCount.load()),
+ NiceBytes(BytesToWrite),
NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()));
- WriteProgressBar.UpdateState(
- {.Task = "Writing chunks ",
- .Details = Details,
- .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite),
- .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - WriteChunkStats.ChunkCountWritten.load())},
- false);
+ WriteProgressBar.UpdateState({.Task = "Writing chunks ",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(BytesToWrite),
+ .RemainingCount = gsl::narrow<uint64_t>(BytesToWrite - DiskStats.WriteByteCount.load())},
+ false);
});
}
@@ -5981,7 +6269,7 @@ namespace {
ZEN_TRACE_CPU("UpdateFolder_FinalizeTree");
Stopwatch Timer;
- WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
+ WorkerThreadPool& WritePool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst);
ProgressBar RebuildProgressBar(UsePlainProgress);
ParallellWork Work(AbortFlag);
@@ -6019,7 +6307,7 @@ namespace {
}
Work.ScheduleWork(
- WritePool, // GetSyncWorkerPool(),//
+ WritePool,
[&, BaseTargetOffset = TargetOffset, TargetCount](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -6070,6 +6358,10 @@ namespace {
TargetsComplete++;
while (TargetOffset < (BaseTargetOffset + TargetCount))
{
+ if (AbortFlag)
+ {
+ break;
+ }
ZEN_TRACE_CPU("FinalizeTree_Copy");
ZEN_ASSERT(Targets[TargetOffset].first == RawHash);
@@ -6140,17 +6432,15 @@ namespace {
std::vector<std::pair<Oid, std::string>> Result;
{
Stopwatch GetBuildTimer;
-
- std::vector<std::pair<Oid, std::string>> AvailableParts;
-
- CbObject BuildObject = Storage.GetBuild(BuildId);
-
- ZEN_CONSOLE("GetBuild took {}. Name: '{}', Payload size: {}",
+ CbObject BuildObject = Storage.GetBuild(BuildId);
+ ZEN_CONSOLE("GetBuild took {}. Name: '{}' ({}, {}), Payload size: {}",
NiceTimeSpanMs(GetBuildTimer.GetElapsedTimeMs()),
- BuildObject["BuildName"sv].AsString(),
+ BuildObject["name"sv].AsString(),
+ BuildObject["type"sv].AsString(),
+ BuildObject["Configuration"sv].AsString(),
NiceBytes(BuildObject.GetSize()));
- ZEN_DEBUG("Build object: {}", BuildObject);
+ ZEN_CONSOLE_VERBOSE("Build object: {}", BuildObject);
CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView();
if (!PartsObject)
@@ -6160,6 +6450,8 @@ namespace {
OutPreferredMultipartChunkSize = BuildObject["chunkSize"sv].AsUInt64(OutPreferredMultipartChunkSize);
+ std::vector<std::pair<Oid, std::string>> AvailableParts;
+
for (CbFieldView PartView : PartsObject)
{
const std::string BuildPartName = std::string(PartView.GetName());
@@ -6221,7 +6513,7 @@ namespace {
return Result;
}
- ChunkedFolderContent GetRemoteContent(BuildStorage& Storage,
+ ChunkedFolderContent GetRemoteContent(StorageInstance& Storage,
const Oid& BuildId,
const std::vector<std::pair<Oid, std::string>>& BuildParts,
std::unique_ptr<ChunkingController>& OutChunkController,
@@ -6234,7 +6526,7 @@ namespace {
Stopwatch GetBuildPartTimer;
const Oid BuildPartId = BuildParts[0].first;
const std::string_view BuildPartName = BuildParts[0].second;
- CbObject BuildPartManifest = Storage.GetBuildPart(BuildId, BuildPartId);
+ CbObject BuildPartManifest = Storage.BuildStorage->GetBuildPart(BuildId, BuildPartId);
ZEN_CONSOLE("GetBuildPart {} ('{}') took {}. Payload size: {}",
BuildPartId,
BuildPartName,
@@ -6248,7 +6540,7 @@ namespace {
OutChunkController = CreateChunkingController(ChunkerName, Parameters);
}
- auto ParseBuildPartManifest = [](BuildStorage& Storage,
+ auto ParseBuildPartManifest = [](StorageInstance& Storage,
const Oid& BuildId,
const Oid& BuildPartId,
CbObject BuildPartManifest,
@@ -6274,12 +6566,103 @@ namespace {
// TODO: GetBlockDescriptions for all BlockRawHashes in one go - check for local block descriptions when we cache them
- Stopwatch GetBlockMetadataTimer;
- OutBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockRawHashes);
- ZEN_CONSOLE("GetBlockMetadata for {} took {}. Found {} blocks",
- BuildPartId,
- NiceTimeSpanMs(GetBlockMetadataTimer.GetElapsedTimeMs()),
- OutBlockDescriptions.size());
+ {
+ Stopwatch GetBlockMetadataTimer;
+
+ std::vector<ChunkBlockDescription> UnorderedList;
+ tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockDescriptionLookup;
+ if (Storage.BuildCacheStorage)
+ {
+ std::vector<CbObject> CacheBlockMetadatas = Storage.BuildCacheStorage->GetBlobMetadatas(BuildId, BlockRawHashes);
+ UnorderedList.reserve(CacheBlockMetadatas.size());
+ for (size_t CacheBlockMetadataIndex = 0; CacheBlockMetadataIndex < CacheBlockMetadatas.size();
+ CacheBlockMetadataIndex++)
+ {
+ const CbObject& CacheBlockMetadata = CacheBlockMetadatas[CacheBlockMetadataIndex];
+ ChunkBlockDescription Description = ParseChunkBlockDescription(CacheBlockMetadata);
+ if (Description.BlockHash == IoHash::Zero)
+ {
+ ZEN_WARN("Unexpected/invalid block metadata received from remote cache, skipping block");
+ }
+ else
+ {
+ UnorderedList.emplace_back(std::move(Description));
+ }
+ }
+ for (size_t DescriptionIndex = 0; DescriptionIndex < UnorderedList.size(); DescriptionIndex++)
+ {
+ const ChunkBlockDescription& Description = UnorderedList[DescriptionIndex];
+ BlockDescriptionLookup.insert_or_assign(Description.BlockHash, DescriptionIndex);
+ }
+ }
+
+ if (UnorderedList.size() < BlockRawHashes.size())
+ {
+ std::vector<IoHash> RemainingBlockHashes;
+ RemainingBlockHashes.reserve(BlockRawHashes.size() - UnorderedList.size());
+ for (const IoHash& BlockRawHash : BlockRawHashes)
+ {
+ if (!BlockDescriptionLookup.contains(BlockRawHash))
+ {
+ RemainingBlockHashes.push_back(BlockRawHash);
+ }
+ }
+ CbObject BlockMetadatas = Storage.BuildStorage->GetBlockMetadatas(BuildId, RemainingBlockHashes);
+ std::vector<ChunkBlockDescription> RemainingList;
+ {
+ CbArrayView BlocksArray = BlockMetadatas["blocks"sv].AsArrayView();
+ std::vector<IoHash> FoundBlockHashes;
+ std::vector<CbObject> FoundBlockMetadatas;
+ for (CbFieldView Block : BlocksArray)
+ {
+ ChunkBlockDescription Description = ParseChunkBlockDescription(Block.AsObjectView());
+
+ if (Description.BlockHash == IoHash::Zero)
+ {
+ ZEN_WARN("Unexpected/invalid block metadata received from remote store, skipping block");
+ }
+ else
+ {
+ if (Storage.BuildCacheStorage)
+ {
+ UniqueBuffer MetaBuffer = UniqueBuffer::Alloc(Block.GetSize());
+ Block.CopyTo(MetaBuffer.GetMutableView());
+ CbObject BlockMetadata(MetaBuffer.MoveToShared());
+
+ FoundBlockHashes.push_back(Description.BlockHash);
+ FoundBlockMetadatas.push_back(BlockMetadata);
+ }
+ RemainingList.emplace_back(std::move(Description));
+ }
+ }
+ if (Storage.BuildCacheStorage && !FoundBlockHashes.empty())
+ {
+ Storage.BuildCacheStorage->PutBlobMetadatas(BuildId, FoundBlockHashes, FoundBlockMetadatas);
+ }
+ }
+
+ for (size_t DescriptionIndex = 0; DescriptionIndex < RemainingList.size(); DescriptionIndex++)
+ {
+ const ChunkBlockDescription& Description = RemainingList[DescriptionIndex];
+ BlockDescriptionLookup.insert_or_assign(Description.BlockHash, UnorderedList.size() + DescriptionIndex);
+ }
+ UnorderedList.insert(UnorderedList.end(), RemainingList.begin(), RemainingList.end());
+ }
+
+ OutBlockDescriptions.reserve(BlockDescriptionLookup.size());
+ for (const IoHash& BlockHash : BlockRawHashes)
+ {
+ if (auto It = BlockDescriptionLookup.find(BlockHash); It != BlockDescriptionLookup.end())
+ {
+ OutBlockDescriptions.push_back(std::move(UnorderedList[It->second]));
+ }
+ }
+
+ ZEN_CONSOLE("GetBlockMetadata for {} took {}. Found {} blocks",
+ BuildPartId,
+ NiceTimeSpanMs(GetBlockMetadataTimer.GetElapsedTimeMs()),
+ OutBlockDescriptions.size());
+ }
if (OutBlockDescriptions.size() != BlockRawHashes.size())
{
@@ -6292,7 +6675,8 @@ namespace {
ZEN_CONSOLE("{} Attemping fallback options.", ErrorDescription);
std::vector<ChunkBlockDescription> AugmentedBlockDescriptions;
AugmentedBlockDescriptions.reserve(BlockRawHashes.size());
- std::vector<ChunkBlockDescription> FoundBlocks = Storage.FindBlocks(BuildId);
+ std::vector<ChunkBlockDescription> FoundBlocks =
+ ParseChunkBlockDescriptionList(Storage.BuildStorage->FindBlocks(BuildId));
for (const IoHash& BlockHash : BlockRawHashes)
{
@@ -6315,7 +6699,7 @@ namespace {
}
else
{
- IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockHash);
+ IoBuffer BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, BlockHash);
if (!BlockBuffer)
{
throw std::runtime_error(fmt::format("Block {} could not be found", BlockHash));
@@ -6381,7 +6765,7 @@ namespace {
const Oid& OverlayBuildPartId = BuildParts[PartIndex].first;
const std::string& OverlayBuildPartName = BuildParts[PartIndex].second;
Stopwatch GetOverlayBuildPartTimer;
- CbObject OverlayBuildPartManifest = Storage.GetBuildPart(BuildId, OverlayBuildPartId);
+ CbObject OverlayBuildPartManifest = Storage.BuildStorage->GetBuildPart(BuildId, OverlayBuildPartId);
ZEN_CONSOLE("GetBuildPart {} ('{}') took {}. Payload size: {}",
OverlayBuildPartId,
OverlayBuildPartName,
@@ -6486,9 +6870,11 @@ namespace {
Path,
std::move(IsAcceptedFolder),
std::move(IsAcceptedFile),
- GetMediumWorkerPool(EWorkloadType::Burst),
+ SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst),
UsePlainProgress ? 5000 : 200,
- [&](bool, std::ptrdiff_t) { ZEN_DEBUG("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path); },
+ [&](bool, std::ptrdiff_t) {
+ ZEN_CONSOLE_VERBOSE("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path);
+ },
AbortFlag);
if (AbortFlag)
{
@@ -6557,7 +6943,7 @@ namespace {
FilteredBytesHashed.Start();
ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent(
ChunkingStats,
- GetMediumWorkerPool(EWorkloadType::Burst),
+ SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst),
Path,
UpdatedContent,
ChunkController,
@@ -6609,8 +6995,6 @@ namespace {
{
LocalContent = DeletePathsFromChunkedContent(LocalContent, DeletedPaths);
}
-
- ZEN_CONSOLE("Using cached local state");
}
ZEN_CONSOLE("Read local state in {}", NiceTimeSpanMs(ReadStateTimer.GetElapsedTimeMs()));
ScanContent = false;
@@ -6636,7 +7020,7 @@ namespace {
FilteredBytesHashed.Start();
ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent(
ChunkingStats,
- GetMediumWorkerPool(EWorkloadType::Burst),
+ SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst),
Path,
CurrentLocalFolderContent,
ChunkController,
@@ -6670,7 +7054,7 @@ namespace {
return LocalContent;
}
- void DownloadFolder(BuildStorage& Storage,
+ void DownloadFolder(StorageInstance& Storage,
const Oid& BuildId,
const std::vector<Oid>& BuildPartIds,
std::span<const std::string> BuildPartNames,
@@ -6694,7 +7078,7 @@ namespace {
std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u;
std::vector<std::pair<Oid, std::string>> AllBuildParts =
- ResolveBuildPartNames(Storage, BuildId, BuildPartIds, BuildPartNames, PreferredMultipartChunkSize);
+ ResolveBuildPartNames(*Storage.BuildStorage, BuildId, BuildPartIds, BuildPartNames, PreferredMultipartChunkSize);
std::vector<ChunkedFolderContent> PartContents;
@@ -6786,7 +7170,7 @@ namespace {
{
BuildPartString.Append(fmt::format(" {} ({})", BuildPart.second, BuildPart.first));
}
- ZEN_CONSOLE("Downloading build {}, parts:{}", BuildId, BuildPartString.ToView());
+ ZEN_CONSOLE("Downloading build {}, parts:{} to '{}'", BuildId, BuildPartString.ToView(), Path);
FolderContent LocalFolderState;
DiskStatistics DiskStats;
@@ -7131,6 +7515,10 @@ BuildsCommand::BuildsCommand()
"<jsonmetadata>");
};
+ auto AddCacheOptions = [this](cxxopts::Options& Ops) {
+ Ops.add_option("cache", "", "zen-cache-host", "Host ip and port for zen builds cache", cxxopts::value(m_ZenCacheHost), "<zenhost>");
+ };
+
auto AddOutputOptions = [this](cxxopts::Options& Ops) {
Ops.add_option("output", "", "plain-progress", "Show progress using plain output", cxxopts::value(m_PlainProgress), "<progress>");
Ops.add_option("output", "", "verbose", "Enable verbose console output", cxxopts::value(m_Verbose), "<verbose>");
@@ -7169,6 +7557,7 @@ BuildsCommand::BuildsCommand()
AddCloudOptions(m_UploadOptions);
AddFileOptions(m_UploadOptions);
AddOutputOptions(m_UploadOptions);
+ AddCacheOptions(m_UploadOptions);
m_UploadOptions.add_options()("h,help", "Print help");
m_UploadOptions.add_option("", "l", "local-path", "Root file system folder for build", cxxopts::value(m_Path), "<local-path>");
m_UploadOptions.add_option("",
@@ -7232,6 +7621,7 @@ BuildsCommand::BuildsCommand()
AddCloudOptions(m_DownloadOptions);
AddFileOptions(m_DownloadOptions);
AddOutputOptions(m_DownloadOptions);
+ AddCacheOptions(m_DownloadOptions);
m_DownloadOptions.add_options()("h,help", "Print help");
m_DownloadOptions.add_option("", "l", "local-path", "Root file system folder for build", cxxopts::value(m_Path), "<local-path>");
m_DownloadOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>");
@@ -7284,6 +7674,7 @@ BuildsCommand::BuildsCommand()
AddCloudOptions(m_TestOptions);
AddFileOptions(m_TestOptions);
AddOutputOptions(m_TestOptions);
+ AddCacheOptions(m_TestOptions);
m_TestOptions.add_options()("h,help", "Print help");
m_TestOptions.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>");
m_TestOptions.add_option("",
@@ -7304,6 +7695,7 @@ BuildsCommand::BuildsCommand()
AddCloudOptions(m_FetchBlobOptions);
AddFileOptions(m_FetchBlobOptions);
AddOutputOptions(m_FetchBlobOptions);
+ AddCacheOptions(m_FetchBlobOptions);
m_FetchBlobOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>");
m_FetchBlobOptions
.add_option("", "", "blob-hash", "IoHash in hex form identifying the blob to download", cxxopts::value(m_BlobHash), "<blob-hash>");
@@ -7333,6 +7725,7 @@ BuildsCommand::BuildsCommand()
AddCloudOptions(m_MultiTestDownloadOptions);
AddFileOptions(m_MultiTestDownloadOptions);
AddOutputOptions(m_MultiTestDownloadOptions);
+ AddCacheOptions(m_MultiTestDownloadOptions);
m_MultiTestDownloadOptions
.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>");
m_MultiTestDownloadOptions.add_option("", "", "build-ids", "Build Ids list separated by ','", cxxopts::value(m_BuildIds), "<ids>");
@@ -7385,6 +7778,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
fmt::format("namespace and bucket options are required for url option\n{}", m_Options.help()));
}
}
+ else if (m_StoragePath.empty())
+ {
+ throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", m_UploadOptions.help()));
+ }
};
std::unique_ptr<AuthMgr> Auth;
@@ -7393,7 +7790,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
auto CreateAuthMgr = [&]() {
if (!Auth)
{
- std::filesystem::path DataRoot = m_SystemRootDir.empty() ? PickDefaultSystemRootDirectory() : StringToPath(m_SystemRootDir);
+ std::filesystem::path DataRoot = m_SystemRootDir.empty()
+ ? PickDefaultSystemRootDirectory()
+ : std::filesystem::absolute(StringToPath(m_SystemRootDir)).make_preferred();
if (m_EncryptionKey.empty())
{
@@ -7448,7 +7847,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
else if (!m_AccessTokenPath.empty())
{
- std::string ResolvedAccessToken = ReadAccessTokenFromFile(StringToPath(m_AccessTokenPath));
+ std::string ResolvedAccessToken =
+ ReadAccessTokenFromFile(std::filesystem::absolute(StringToPath(m_AccessTokenPath)).make_preferred());
if (!ResolvedAccessToken.empty())
{
ClientSettings.AccessTokenProvider = httpclientauth::CreateFromStaticToken(ResolvedAccessToken);
@@ -7486,15 +7886,63 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
};
ParseOutputOptions();
+ auto CreateBuildStorage = [&](BuildStorage::Statistics& StorageStats,
+ BuildStorageCache::Statistics& StorageCacheStats,
+ const std::filesystem::path& TempPath) -> StorageInstance {
+ ParseStorageOptions();
+
+ StorageInstance Result;
+
+ if (!m_BuildsUrl.empty())
+ {
+ ParseAuthOptions();
+ Result.BuildStorageHttp = std::make_unique<HttpClient>(m_BuildsUrl, ClientSettings);
+ ZEN_CONSOLE("Using cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}'",
+ m_BuildsUrl,
+ Result.BuildStorageHttp->GetSessionId(),
+ m_Namespace,
+ m_Bucket);
+ Result.BuildStorage =
+ CreateJupiterBuildStorage(Log(), *Result.BuildStorageHttp, StorageStats, m_Namespace, m_Bucket, TempPath / "storage");
+ Result.StorageName = ZEN_CLOUD_STORAGE;
+ }
+ else if (!m_StoragePath.empty())
+ {
+ std::filesystem::path StoragePath = std::filesystem::absolute(StringToPath(m_StoragePath)).make_preferred();
+ ZEN_CONSOLE("Using folder {}", StoragePath);
+ Result.BuildStorage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
+ Result.StorageName = fmt::format("Disk {}", StoragePath.stem());
+ }
+ else
+ {
+ throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
+ }
+ if (!m_ZenCacheHost.empty())
+ {
+ Result.CacheHttp = std::make_unique<HttpClient>(m_ZenCacheHost,
+ HttpClientSettings{.LogCategory = "httpcacheclient",
+ .ConnectTimeout = std::chrono::milliseconds{3000},
+ .Timeout = std::chrono::milliseconds{30000},
+ .AssumeHttp2 = false,
+ .AllowResume = true,
+ .RetryCount = 0});
+ if (Result.CacheHttp->Get("/health").IsSuccess())
+ {
+ Result.BuildCacheStorage =
+ CreateZenBuildStorageCache(*Result.CacheHttp, StorageCacheStats, m_Namespace, m_Bucket, TempPath / "zencache");
+ }
+ else
+ {
+ Result.CacheHttp.reset();
+ }
+ }
+ return Result;
+ };
+
try
{
if (SubOption == &m_ListOptions)
{
- ParseStorageOptions();
- ParseAuthOptions();
-
- HttpClient Http(m_BuildsUrl, ClientSettings);
-
CbObject QueryObject;
if (m_ListQueryPath.empty())
{
@@ -7505,7 +7953,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
else
{
- std::filesystem::path ListQueryPath = StringToPath(m_ListQueryPath);
+ std::filesystem::path ListQueryPath = std::filesystem::absolute(StringToPath(m_ListQueryPath)).make_preferred();
if (ToLower(ListQueryPath.extension().string()) == ".cbo")
{
QueryObject = LoadCompactBinaryObject(IoBufferBuilder::MakeFromFile(ListQueryPath));
@@ -7525,28 +7973,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
BuildStorage::Statistics StorageStats;
- std::unique_ptr<BuildStorage> Storage;
- if (!m_BuildsUrl.empty())
- {
- ZEN_CONSOLE_VERBOSE("Querying builds in cloud endpoint '{}'. SessionId: '{}'. Namespace '{}', Bucket '{}'",
- m_BuildsUrl,
- Http.GetSessionId(),
- m_Namespace,
- m_Bucket);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, std::filesystem::path{});
- }
- else if (!m_StoragePath.empty())
- {
- std::filesystem::path StoragePath = StringToPath(m_StoragePath);
- ZEN_CONSOLE_VERBOSE("Querying builds in folder '{}'.", StoragePath);
- Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- }
- else
- {
- throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
- }
+ BuildStorageCache::Statistics StorageCacheStats;
- CbObject Response = Storage->ListBuilds(QueryObject);
+ StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderName);
+
+ CbObject Response = Storage.BuildStorage->ListBuilds(QueryObject);
ZEN_ASSERT(ValidateCompactBinary(Response.GetView(), CbValidateMode::All) == CbValidateError::None);
if (m_ListResultPath.empty())
{
@@ -7556,7 +7987,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
else
{
- std::filesystem::path ListResultPath = StringToPath(m_ListResultPath);
+ std::filesystem::path ListResultPath = std::filesystem::absolute(StringToPath(m_ListResultPath)).make_preferred();
if (ToLower(ListResultPath.extension().string()) == ".cbo")
{
MemoryView ResponseView = Response.GetView();
@@ -7575,11 +8006,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_UploadOptions)
{
- ParseStorageOptions();
- ParseAuthOptions();
-
- HttpClient Http(m_BuildsUrl, ClientSettings);
-
if (m_Path.empty())
{
throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_UploadOptions.help()));
@@ -7610,7 +8036,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
}
- std::filesystem::path Path = StringToPath(m_Path);
+ std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path));
if (m_BuildPartName.empty())
{
@@ -7645,48 +8071,20 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw zen::OptionParseException(fmt::format("Invalid build part id\n{}", m_UploadOptions.help()));
}
+ const Oid BuildId = Oid::FromHexString(m_BuildId);
+ const Oid BuildPartId = Oid::FromHexString(m_BuildPartId);
+
BuildStorage::Statistics StorageStats;
- const Oid BuildId = Oid::FromHexString(m_BuildId);
- const Oid BuildPartId = Oid::FromHexString(m_BuildPartId);
- std::unique_ptr<BuildStorage> Storage;
- std::string StorageName;
- if (!m_BuildsUrl.empty())
- {
- ZEN_CONSOLE("Uploading '{}' from '{}' to cloud endpoint '{}'. SessionId: '{}'. Namespace '{}', Bucket '{}', {}BuildId '{}'",
- m_BuildPartName,
- Path,
- m_BuildsUrl,
- Http.GetSessionId(),
- m_Namespace,
- m_Bucket,
- GeneratedBuildId ? "Generated " : "",
- BuildId);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
- StorageName = "Cloud DDC";
- }
- else if (!m_StoragePath.empty())
- {
- std::filesystem::path StoragePath = StringToPath(m_StoragePath);
- ZEN_CONSOLE("Uploading '{}' from '{}' to folder '{}'. {}BuildId '{}'",
- m_BuildPartName,
- Path,
- StoragePath,
- GeneratedBuildId ? "Generated " : "",
- BuildId);
- Storage = CreateFileBuildStorage(StoragePath, StorageStats, m_WriteMetadataAsJson, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", StoragePath.stem());
- }
- else
- {
- throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
- }
+ BuildStorageCache::Statistics StorageCacheStats;
+
+ StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, Path / ZenTempFolderName);
CbObject MetaData;
if (m_CreateBuild)
{
if (!m_BuildMetadataPath.empty())
{
- std::filesystem::path MetadataPath = StringToPath(m_BuildMetadataPath);
+ std::filesystem::path MetadataPath = std::filesystem::absolute(StringToPath(m_BuildMetadataPath));
IoBuffer MetaDataJson = ReadFile(MetadataPath).Flatten();
std::string_view Json(reinterpret_cast<const char*>(MetaDataJson.GetData()), MetaDataJson.GetSize());
std::string JsonError;
@@ -7713,12 +8111,12 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
}
- UploadFolder(*Storage,
+ UploadFolder(Storage,
BuildId,
BuildPartId,
m_BuildPartName,
Path,
- StringToPath(m_ManifestPath),
+ std::filesystem::absolute(StringToPath(m_ManifestPath)).make_preferred(),
m_BlockReuseMinPercentLimit,
m_AllowMultiparts,
MetaData,
@@ -7735,7 +8133,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
"Requests: {}\n"
"Avg Request Time: {}\n"
"Avg I/O Time: {}",
- StorageName,
+ Storage.StorageName,
NiceBytes(StorageStats.TotalBytesRead.load()),
NiceBytes(StorageStats.TotalBytesWritten.load()),
StorageStats.TotalRequestCount.load(),
@@ -7751,11 +8149,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_DownloadOptions)
{
- ParseStorageOptions();
- ParseAuthOptions();
-
- HttpClient Http(m_BuildsUrl, ClientSettings);
-
if (m_Path.empty())
{
throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help()));
@@ -7786,37 +8179,14 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
}
- std::filesystem::path Path = StringToPath(m_Path);
+ std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)).make_preferred();
BuildStorage::Statistics StorageStats;
- std::unique_ptr<BuildStorage> Storage;
- std::string StorageName;
- if (!m_BuildsUrl.empty())
- {
- ZEN_CONSOLE("Downloading '{}' to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'",
- BuildId,
- Path,
- m_BuildsUrl,
- Http.GetSessionId(),
- m_Namespace,
- m_Bucket,
- BuildId);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
- StorageName = "Cloud DDC";
- }
- else if (!m_StoragePath.empty())
- {
- std::filesystem::path StoragePath = StringToPath(m_StoragePath);
- ZEN_CONSOLE("Downloading '{}' to '{}' from folder {}. BuildId '{}'", BuildId, Path, StoragePath, BuildId);
- Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", StoragePath.stem());
- }
- else
- {
- throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
- }
+ BuildStorageCache::Statistics StorageCacheStats;
+
+ StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, Path / ZenTempFolderName);
- DownloadFolder(*Storage,
+ DownloadFolder(Storage,
BuildId,
BuildPartIds,
m_BuildPartNames,
@@ -7835,7 +8205,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
"Requests: {}\n"
"Avg Request Time: {}\n"
"Avg I/O Time: {}",
- StorageName,
+ Storage.StorageName,
NiceBytes(StorageStats.TotalBytesRead.load()),
NiceBytes(StorageStats.TotalBytesWritten.load()),
StorageStats.TotalRequestCount.load(),
@@ -7859,8 +8229,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
throw zen::OptionParseException(fmt::format("compare-path is required\n{}", m_DownloadOptions.help()));
}
- std::filesystem::path Path = StringToPath(m_Path);
- std::filesystem::path DiffPath = StringToPath(m_DiffPath);
+ std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)).make_preferred();
+ std::filesystem::path DiffPath = std::filesystem::absolute(StringToPath(m_DiffPath)).make_preferred();
DiffFolders(Path, DiffPath, m_OnlyChunked);
return AbortFlag ? 11 : 0;
}
@@ -7872,10 +8242,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help()));
}
- ParseStorageOptions();
- ParseAuthOptions();
-
- HttpClient Http(m_BuildsUrl, ClientSettings);
// m_StoragePath = "D:\\buildstorage";
// m_Path = "F:\\Saved\\DownloadedBuilds\\++Fortnite+Main-CL-XXXXXXXX\\WindowsClient";
// std::vector<std::string> BuildIdStrings{"07d3942f0e7f4ca1b13b0587",
@@ -7886,34 +8252,12 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
// "07d3964f919d577a321a1fdd",
// "07d396a6ce875004e16b9528"};
- std::filesystem::path Path = StringToPath(m_Path);
+ std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)).make_preferred();
BuildStorage::Statistics StorageStats;
- std::unique_ptr<BuildStorage> Storage;
- std::string StorageName;
- if (!m_BuildsUrl.empty())
- {
- ZEN_CONSOLE("Downloading {} to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}'",
- FormatArray<std::string>(m_BuildIds, " "sv),
- Path,
- m_BuildsUrl,
- Http.GetSessionId(),
- m_Namespace,
- m_Bucket);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
- StorageName = "Cloud DDC";
- }
- else if (!m_StoragePath.empty())
- {
- std::filesystem::path StoragePath = StringToPath(m_StoragePath);
- ZEN_CONSOLE("Downloading {}'to '{}' from folder {}", FormatArray<std::string>(m_BuildIds, " "sv), Path, StoragePath);
- Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", StoragePath.stem());
- }
- else
- {
- throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
- }
+ BuildStorageCache::Statistics StorageCacheStats;
+
+ StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, Path / ZenTempFolderName);
Stopwatch Timer;
for (const std::string& BuildIdString : m_BuildIds)
@@ -7923,7 +8267,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
throw zen::OptionParseException(fmt::format("invalid build id {}\n{}", BuildIdString, m_DownloadOptions.help()));
}
- DownloadFolder(*Storage,
+ DownloadFolder(Storage,
BuildId,
{},
{},
@@ -7945,36 +8289,29 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_TestOptions)
{
- ParseStorageOptions();
- ParseAuthOptions();
-
- HttpClient Http(m_BuildsUrl, ClientSettings);
-
if (m_Path.empty())
{
throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help()));
}
- std::filesystem::path Path = StringToPath(m_Path);
+ std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)).make_preferred();
m_BuildId = Oid::NewOid().ToString();
m_BuildPartName = Path.filename().string();
m_BuildPartId = Oid::NewOid().ToString();
m_CreateBuild = true;
- BuildStorage::Statistics StorageStats;
- const Oid BuildId = Oid::FromHexString(m_BuildId);
- const Oid BuildPartId = Oid::FromHexString(m_BuildPartId);
- std::unique_ptr<BuildStorage> Storage;
- std::string StorageName;
+ const Oid BuildId = Oid::FromHexString(m_BuildId);
+ const Oid BuildPartId = Oid::FromHexString(m_BuildPartId);
- std::filesystem::path StoragePath = StringToPath(m_StoragePath);
+ std::filesystem::path StoragePath = std::filesystem::absolute(StringToPath(m_StoragePath)).make_preferred();
if (m_BuildsUrl.empty() && StoragePath.empty())
{
StoragePath = (GetRunningExecutablePath().parent_path() / ".tmpstore").make_preferred();
CreateDirectories(StoragePath);
CleanDirectory(StoragePath, {});
+ m_StoragePath = StoragePath.generic_string();
}
auto _ = MakeGuard([&]() {
if (m_BuildsUrl.empty() && StoragePath.empty())
@@ -7983,33 +8320,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
});
- if (!m_BuildsUrl.empty())
- {
- ZEN_CONSOLE("Using '{}' to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'",
- m_BuildPartName.empty() ? m_BuildPartId : m_BuildPartName,
- Path,
- m_BuildsUrl,
- Http.GetSessionId(),
- m_Namespace,
- m_Bucket,
- BuildId);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
- StorageName = "Cloud DDC";
- }
- else if (!StoragePath.empty())
- {
- ZEN_CONSOLE("Using '{}' to '{}' from folder {}. BuildId '{}'",
- m_BuildPartName.empty() ? m_BuildPartId : m_BuildPartName,
- Path,
- StoragePath,
- BuildId);
- Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", StoragePath.stem());
- }
- else
- {
- throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
- }
+ BuildStorage::Statistics StorageStats;
+ BuildStorageCache::Statistics StorageCacheStats;
+
+ StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, Path / ZenTempFolderName);
auto MakeMetaData = [](const Oid& BuildId) -> CbObject {
CbObjectWriter BuildMetaDataWriter;
@@ -8032,7 +8346,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
ZEN_CONSOLE("Upload Build {}, Part {} ({})\n{}", m_BuildId, BuildPartId, m_BuildPartName, SB.ToView());
}
- UploadFolder(*Storage,
+ UploadFolder(Storage,
BuildId,
BuildPartId,
m_BuildPartName,
@@ -8052,7 +8366,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
const std::filesystem::path DownloadPath = Path.parent_path() / (m_BuildPartName + "_download");
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, true, true);
+ DownloadFolder(Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, true, true);
if (AbortFlag)
{
ZEN_CONSOLE("Download failed.");
@@ -8064,7 +8378,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildPartId,
m_BuildPartName,
DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
+ DownloadFolder(Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed. (identical target)");
@@ -8115,7 +8429,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SourceSize > 256)
{
Work.ScheduleWork(
- GetMediumWorkerPool(EWorkloadType::Burst),
+ SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst),
[SourceSize, FilePath](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -8168,7 +8482,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildPartId,
m_BuildPartName,
DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
+ DownloadFolder(Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed. (scrambled target)");
@@ -8187,7 +8501,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
ZEN_CONSOLE("\nUpload scrambled Build {}, Part {} ({})\n{}\n", BuildId2, BuildPartId2, m_BuildPartName, SB.ToView());
}
- UploadFolder(*Storage,
+ UploadFolder(Storage,
BuildId2,
BuildPartId2,
m_BuildPartName,
@@ -8206,7 +8520,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
+ DownloadFolder(Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed.");
@@ -8214,7 +8528,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage,
+ DownloadFolder(Storage,
BuildId2,
{BuildPartId2},
{},
@@ -8230,7 +8544,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage,
+ DownloadFolder(Storage,
BuildId2,
{BuildPartId2},
{},
@@ -8250,11 +8564,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_FetchBlobOptions)
{
- ParseStorageOptions();
- ParseAuthOptions();
-
- HttpClient Http(m_BuildsUrl, ClientSettings);
-
if (m_BlobHash.empty())
{
throw zen::OptionParseException(fmt::format("Blob hash string is missing\n{}", m_UploadOptions.help()));
@@ -8266,44 +8575,18 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw zen::OptionParseException(fmt::format("Blob hash string is invalid\n{}", m_UploadOptions.help()));
}
- if (m_BuildsUrl.empty() && m_StoragePath.empty())
- {
- throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", m_UploadOptions.help()));
- }
+ const Oid BuildId = Oid::FromHexString(m_BuildId);
- BuildStorage::Statistics StorageStats;
- const Oid BuildId = Oid::FromHexString(m_BuildId);
- std::unique_ptr<BuildStorage> Storage;
- std::string StorageName;
+ std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)).make_preferred();
- std::filesystem::path Path = StringToPath(m_Path);
+ BuildStorage::Statistics StorageStats;
+ BuildStorageCache::Statistics StorageCacheStats;
- if (!m_BuildsUrl.empty())
- {
- ZEN_CONSOLE("Using from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'",
- m_BuildsUrl,
- Http.GetSessionId(),
- m_Namespace,
- m_Bucket,
- BuildId);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
- StorageName = "Cloud DDC";
- }
- else if (!m_StoragePath.empty())
- {
- std::filesystem::path StoragePath = StringToPath(m_StoragePath);
- ZEN_CONSOLE("Using folder {}. BuildId '{}'", StoragePath, BuildId);
- Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", StoragePath.stem());
- }
- else
- {
- throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
- }
+ StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, Path / ZenTempFolderName);
uint64_t CompressedSize;
uint64_t DecompressedSize;
- ValidateBlob(*Storage, BuildId, BlobHash, CompressedSize, DecompressedSize);
+ ValidateBlob(*Storage.BuildStorage, BuildId, BlobHash, CompressedSize, DecompressedSize);
if (AbortFlag)
{
return 11;
@@ -8317,10 +8600,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_ValidateBuildPartOptions)
{
- ParseStorageOptions();
- ParseAuthOptions();
-
- HttpClient Http(m_BuildsUrl, ClientSettings);
+ // HttpClient Http(m_BuildsUrl, ClientSettings);
if (m_BuildsUrl.empty() && m_StoragePath.empty())
{
@@ -8342,39 +8622,18 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw zen::OptionParseException(fmt::format("build-part-id conflicts with build-part-name\n{}", m_DownloadOptions.help()));
}
+ std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)).make_preferred();
+
BuildStorage::Statistics StorageStats;
- std::unique_ptr<BuildStorage> Storage;
- std::string StorageName;
+ BuildStorageCache::Statistics StorageCacheStats;
- std::filesystem::path Path = StringToPath(m_Path);
+ StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, Path / ZenTempFolderName);
- if (!m_BuildsUrl.empty())
- {
- ZEN_CONSOLE("Using from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'",
- m_BuildsUrl,
- Http.GetSessionId(),
- m_Namespace,
- m_Bucket,
- BuildId);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
- StorageName = "Cloud DDC";
- }
- else if (!m_StoragePath.empty())
- {
- std::filesystem::path StoragePath = StringToPath(m_StoragePath);
- ZEN_CONSOLE("Using folder {}. BuildId '{}'", StoragePath, BuildId);
- Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", StoragePath.stem());
- }
- else
- {
- throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
- }
Oid BuildPartId = Oid::TryFromHexString(m_BuildPartId);
ValidateStatistics ValidateStats;
DownloadStatistics DownloadStats;
- ValidateBuildPart(*Storage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, DownloadStats);
+ ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, DownloadStats);
return AbortFlag ? 13 : 0;
}