diff options
| author | Dan Engelbrecht <[email protected]> | 2025-03-26 17:06:23 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-03-26 17:06:23 +0100 |
| commit | 28bc5ebf05984385cc0567c89b1d8e7a541ebef8 (patch) | |
| tree | 424efc19bc8630d33f76d3372f9105731d00a45f /src/zen/cmds/builds_cmd.cpp | |
| parent | don't let auth env argument block other auth options (#316) (diff) | |
| download | archived-zen-28bc5ebf05984385cc0567c89b1d8e7a541ebef8.tar.xz archived-zen-28bc5ebf05984385cc0567c89b1d8e7a541ebef8.zip | |
zen build cache service (#318)
- **EXPERIMENTAL** `zen builds`
- Feature: `--zen-cache-host` option for `upload` and `download` operations to use a zenserver host `/builds` endpoint for storing build blob and blob metadata
- Feature: New `/builds` endpoint for caching build blobs and blob metadata
- `/builds/{namespace}/{bucket}/{buildid}/blobs/{hash}` `GET` and `PUT` method for storing and fetching blobs
- `/builds/{namespace}/{bucket}/{buildid}/blobs/putBlobMetadata` `POST` method for storing metadata about blobs
- `/builds/{namespace}/{bucket}/{buildid}/blobs/getBlobMetadata` `POST` method for fetching metadata about blobs
- `/builds/{namespace}/{bucket}/{buildid}/blobs/exists` `POST` method for checking existance of blobs
Diffstat (limited to 'src/zen/cmds/builds_cmd.cpp')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 1883 |
1 files changed, 1071 insertions, 812 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 889ccef0b..b2ad579f1 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -20,6 +20,7 @@ #include <zenhttp/httpclient.h> #include <zenhttp/httpclientauth.h> #include <zenhttp/httpcommon.h> +#include <zenutil/buildstoragecache.h> #include <zenutil/chunkblock.h> #include <zenutil/chunkedcontent.h> #include <zenutil/chunkedfile.h> @@ -51,6 +52,8 @@ ZEN_THIRD_PARTY_INCLUDES_END #define EXTRA_VERIFY 0 +#define ZEN_CLOUD_STORAGE "Cloud Storage" + namespace zen { namespace { static std::atomic<bool> AbortFlag = false; @@ -87,6 +90,8 @@ namespace { const double DefaultLatency = 0; // .0010; const double DefaultDelayPerKBSec = 0; // 0.00005; + const bool SingleThreaded = false; + const std::string ZenFolderName = ".zen"; const std::string ZenStateFilePath = fmt::format("{}/current_state.cbo", ZenFolderName); const std::string ZenStateFileJsonPath = fmt::format("{}/current_state.json", ZenFolderName); @@ -204,22 +209,27 @@ namespace { { try { - std::filesystem::remove(LocalFilePath); - } - catch (const std::exception&) - { - // DeleteOnClose files may be a bit slow in getting cleaned up, so pause amd retry one time - Sleep(200); - try - { - std::filesystem::remove(LocalFilePath); - } - catch (const std::exception& Ex) + std::error_code Ec; + std::filesystem::remove(LocalFilePath, Ec); + if (Ec) { - ZEN_WARN("Failed removing file {}. Reason: {}", LocalFilePath, Ex.what()); - CleanWipe = false; + // DeleteOnClose files may be a bit slow in getting cleaned up, so pause amd retry one time + Ec.clear(); + if (std::filesystem::exists(LocalFilePath, Ec) || Ec) + { + Sleep(200); + if (std::filesystem::exists(LocalFilePath)) + { + std::filesystem::remove(LocalFilePath); + } + } } } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed removing file {}. Reason: {}", LocalFilePath, Ex.what()); + CleanWipe = false; + } } for (const std::filesystem::path& LocalDirPath : LocalDirectoryContent.Directories) @@ -425,7 +435,7 @@ namespace { Path, std::move(IsAcceptedFolder), std::move(IsAcceptedFile), - GetMediumWorkerPool(EWorkloadType::Burst), + SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst), UsePlainProgress ? 5000 : 200, [](bool, std::ptrdiff_t) {}, AbortFlag); @@ -439,7 +449,7 @@ namespace { FilteredBytesHashed.Start(); ChunkedFolderContent FolderContent = ChunkFolderContent( ChunkingStats, - GetMediumWorkerPool(EWorkloadType::Burst), + SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst), Path, Content, ChunkController, @@ -603,11 +613,9 @@ namespace { struct WriteChunkStatistics { - std::atomic<uint32_t> ChunkCountWritten = 0; - std::atomic<uint64_t> ChunkBytesWritten = 0; - uint64_t DownloadTimeUs = 0; - uint64_t WriteTimeUs = 0; - uint64_t WriteChunksElapsedWallTimeUs = 0; + uint64_t DownloadTimeUs = 0; + uint64_t WriteTimeUs = 0; + uint64_t WriteChunksElapsedWallTimeUs = 0; }; struct RebuildFolderStateStatistics @@ -626,6 +634,15 @@ namespace { uint64_t VerifyElapsedWallTimeUs = 0; }; + struct StorageInstance + { + std::unique_ptr<HttpClient> BuildStorageHttp; + std::unique_ptr<BuildStorage> BuildStorage; + std::string StorageName; + std::unique_ptr<HttpClient> CacheHttp; + std::unique_ptr<BuildStorageCache> BuildCacheStorage; + }; + std::vector<uint32_t> CalculateAbsoluteChunkOrders(const std::span<const IoHash> LocalChunkHashes, const std::span<const uint32_t> LocalChunkOrder, const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& ChunkHashToLocalChunkIndex, @@ -772,7 +789,7 @@ namespace { std::span<const uint32_t> ChunkCounts, std::span<const IoHash> LocalChunkHashes, std::span<const uint64_t> LocalChunkRawSizes, - std::vector<uint32_t> AbsoluteChunkOrders, + const std::vector<uint32_t>& AbsoluteChunkOrders, const std::span<const uint32_t> LooseLocalChunkIndexes, const std::span<IoHash> BlockHashes) { @@ -1444,7 +1461,7 @@ namespace { for (auto& WorkItem : WorkItems) { Work.ScheduleWork( - NetworkPool, // GetSyncWorkerPool(),// + NetworkPool, [WorkItem = std::move(WorkItem)](std::atomic<bool>&) { ZEN_TRACE_CPU("DownloadLargeBlob_Work"); if (!AbortFlag) @@ -1513,15 +1530,16 @@ namespace { } ValidateStats.BlockAttachmentCount = BlockAttachments.size(); - std::vector<ChunkBlockDescription> VerifyBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockAttachments); + std::vector<ChunkBlockDescription> VerifyBlockDescriptions = + ParseChunkBlockDescriptionList(Storage.GetBlockMetadatas(BuildId, BlockAttachments)); if (VerifyBlockDescriptions.size() != BlockAttachments.size()) { throw std::runtime_error(fmt::format("Uploaded blocks metadata could not all be found, {} blocks metadata is missing", BlockAttachments.size() - VerifyBlockDescriptions.size())); } - WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // - WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // + WorkerThreadPool& NetworkPool = SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst); + WorkerThreadPool& VerifyPool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst); ParallellWork Work(AbortFlag); const std::filesystem::path TempFolder = ".zen-tmp"; @@ -1881,7 +1899,7 @@ namespace { void GenerateBuildBlocks(const std::filesystem::path& Path, const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, - BuildStorage& Storage, + StorageInstance& Storage, const Oid& BuildId, const std::vector<std::vector<uint32_t>>& NewBlockChunks, GeneratedBlocks& OutBlocks, @@ -1904,9 +1922,8 @@ namespace { RwLock Lock; - WorkerThreadPool& GenerateBlobsPool = - GetMediumWorkerPool(EWorkloadType::Burst); // GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();// - WorkerThreadPool& UploadBlocksPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();// + WorkerThreadPool& GenerateBlobsPool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst); + WorkerThreadPool& UploadBlocksPool = SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst); FilteredRate FilteredGeneratedBytesPerSecond; FilteredRate FilteredUploadedBytesPerSecond; @@ -2005,21 +2022,35 @@ namespace { const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash; const uint64_t CompressedBlockSize = Payload.GetCompressedSize(); - Storage.PutBuildBlob(BuildId, - BlockHash, - ZenContentType::kCompressedBinary, - std::move(Payload).GetCompressed()); + if (Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBuildBlob(BuildId, + BlockHash, + ZenContentType::kCompressedBinary, + Payload.GetCompressed()); + } + + Storage.BuildStorage->PutBuildBlob(BuildId, + BlockHash, + ZenContentType::kCompressedBinary, + std::move(Payload).GetCompressed()); UploadStats.BlocksBytes += CompressedBlockSize; + ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks", - OutBlocks.BlockDescriptions[BlockIndex].BlockHash, + BlockHash, NiceBytes(CompressedBlockSize), OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); - Storage.PutBlockMetadata(BuildId, - OutBlocks.BlockDescriptions[BlockIndex].BlockHash, - BlockMetaData); + if (Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBlobMetadatas(BuildId, + std::vector<IoHash>({BlockHash}), + std::vector<CbObject>({BlockMetaData})); + } + + Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData); ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", - OutBlocks.BlockDescriptions[BlockIndex].BlockHash, + BlockHash, NiceBytes(BlockMetaData.GetSize())); OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; @@ -2074,7 +2105,7 @@ namespace { } } - void UploadPartBlobs(BuildStorage& Storage, + void UploadPartBlobs(StorageInstance& Storage, const Oid& BuildId, const std::filesystem::path& Path, const ChunkedFolderContent& Content, @@ -2092,8 +2123,8 @@ namespace { { ProgressBar ProgressBar(UsePlainProgress); - WorkerThreadPool& ReadChunkPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // - WorkerThreadPool& UploadChunkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // + WorkerThreadPool& ReadChunkPool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst); + WorkerThreadPool& UploadChunkPool = SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst); FilteredRate FilteredGenerateBlockBytesPerSecond; FilteredRate FilteredCompressedBytesPerSecond; @@ -2177,18 +2208,26 @@ namespace { const CbObject BlockMetaData = BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]); - Storage.PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); + if (Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); + } + Storage.BuildStorage->PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks", - NewBlocks.BlockDescriptions[BlockIndex].BlockHash, + BlockHash, NiceBytes(PayloadSize), NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); UploadedBlockSize += PayloadSize; UploadStats.BlocksBytes += PayloadSize; - Storage.PutBlockMetadata(BuildId, BlockHash, BlockMetaData); - ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", - NewBlocks.BlockDescriptions[BlockIndex].BlockHash, - NiceBytes(BlockMetaData.GetSize())); + if (Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBlobMetadatas(BuildId, + std::vector<IoHash>({BlockHash}), + std::vector<CbObject>({BlockMetaData})); + } + Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData); + ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", BlockHash, NiceBytes(BlockMetaData.GetSize())); NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; @@ -2214,12 +2253,17 @@ namespace { ZEN_TRACE_CPU("AsyncUploadLooseChunk"); const uint64_t PayloadSize = Payload.GetSize(); - ; + + if (Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); + } + if (PayloadSize >= LargeAttachmentSize) { ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart"); UploadStats.MultipartAttachmentCount++; - std::vector<std::function<void()>> MultipartWork = Storage.PutLargeBuildBlob( + std::vector<std::function<void()>> MultipartWork = Storage.BuildStorage->PutLargeBuildBlob( BuildId, RawHash, ZenContentType::kCompressedBinary, @@ -2232,7 +2276,7 @@ namespace { PartPayload.SetContentType(ZenContentType::kBinary); return PartPayload; }, - [&, RawSize](uint64_t SentBytes, bool IsComplete) { + [&, Payload, RawSize](uint64_t SentBytes, bool IsComplete) { UploadStats.ChunksBytes += SentBytes; UploadedCompressedChunkSize += SentBytes; if (IsComplete) @@ -2264,7 +2308,7 @@ namespace { else { ZEN_TRACE_CPU("AsyncUploadLooseChunk_Singlepart"); - Storage.PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); + Storage.BuildStorage->PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); ZEN_CONSOLE_VERBOSE("Uploaded chunk {} ({})", RawHash, NiceBytes(PayloadSize)); UploadStats.ChunksBytes += Payload.GetSize(); UploadStats.ChunkCount++; @@ -2303,7 +2347,7 @@ namespace { if (!AbortFlag) { Work.ScheduleWork( - ReadChunkPool, // GetSyncWorkerPool() + SingleThreaded ? GetSyncWorkerPool() : ReadChunkPool, [&, BlockIndex](std::atomic<bool>&) { if (!AbortFlag) { @@ -2362,7 +2406,7 @@ namespace { { const uint32_t ChunkIndex = LooseChunkIndexes[CompressLooseChunkOrderIndex]; Work.ScheduleWork( - ReadChunkPool, // GetSyncWorkerPool(),// ReadChunkPool, + SingleThreaded ? GetSyncWorkerPool() : ReadChunkPool, [&, ChunkIndex](std::atomic<bool>&) { if (!AbortFlag) { @@ -2598,7 +2642,7 @@ namespace { return FilteredReuseBlockIndexes; }; - void UploadFolder(BuildStorage& Storage, + void UploadFolder(StorageInstance& Storage, const Oid& BuildId, const Oid& BuildPartId, const std::string_view BuildPartName, @@ -2654,7 +2698,7 @@ namespace { ZEN_TRACE_CPU("CreateBuild"); Stopwatch PutBuildTimer; - CbObject PutBuildResult = Storage.PutBuild(BuildId, MetaData); + CbObject PutBuildResult = Storage.BuildStorage->PutBuild(BuildId, MetaData); Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs(); Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize); Result.PayloadSize = MetaData.GetSize(); @@ -2663,7 +2707,7 @@ namespace { { ZEN_TRACE_CPU("PutBuild"); Stopwatch GetBuildTimer; - CbObject Build = Storage.GetBuild(BuildId); + CbObject Build = Storage.BuildStorage->GetBuild(BuildId); Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs(); Result.PayloadSize = Build.GetSize(); if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) @@ -2681,7 +2725,7 @@ namespace { { ZEN_TRACE_CPU("FindBlocks"); Stopwatch KnownBlocksTimer; - Result.KnownBlocks = Storage.FindBlocks(BuildId); + Result.KnownBlocks = ParseChunkBlockDescriptionList(Storage.BuildStorage->FindBlocks(BuildId)); FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs(); FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size(); Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs(); @@ -2796,10 +2840,10 @@ namespace { } return true; }, - GetMediumWorkerPool(EWorkloadType::Burst), + SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst), UsePlainProgress ? 5000 : 200, [&](bool, std::ptrdiff_t) { - ZEN_DEBUG("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path); + ZEN_CONSOLE_VERBOSE("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path); }, AbortFlag); } @@ -2855,7 +2899,7 @@ namespace { FilteredBytesHashed.Start(); LocalContent = ChunkFolderContent( ChunkingStats, - GetMediumWorkerPool(EWorkloadType::Burst), + SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst), Path, Content, *ChunkController, @@ -2985,7 +3029,7 @@ namespace { (FindBlocksStats.AcceptedByteCount + FindBlocksStats.AcceptedReduntantByteCount) : 0.0; ZEN_CONSOLE( - "Found {} chunks in {} ({}) blocks eligeble for reuse in {}\n" + "Found {} chunks in {} ({}) blocks eligible for reuse in {}\n" " Reusing {} ({}) matching chunks in {} blocks ({:.1f}%)\n" " Accepting {} ({}) redundant chunks ({:.1f}%)\n" " Rejected {} ({}) chunks in {} blocks\n" @@ -3204,7 +3248,8 @@ namespace { } Stopwatch PutBuildPartResultTimer; - std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult = Storage.PutBuildPart(BuildId, BuildPartId, BuildPartName, PartManifest); + std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult = + Storage.BuildStorage->PutBuildPart(BuildId, BuildPartId, BuildPartName, PartManifest); ZEN_CONSOLE("PutBuildPart took {}, payload size {}. {} attachments are needed.", NiceTimeSpanMs(PutBuildPartResultTimer.GetElapsedTimeMs()), NiceBytes(PartManifest.GetSize()), @@ -3289,7 +3334,7 @@ namespace { while (!AbortFlag) { Stopwatch FinalizeBuildPartTimer; - std::vector<IoHash> Needs = Storage.FinalizeBuildPart(BuildId, BuildPartId, PartHash); + std::vector<IoHash> Needs = Storage.BuildStorage->FinalizeBuildPart(BuildId, BuildPartId, PartHash); ZEN_CONSOLE("FinalizeBuildPart took {}. {} attachments are missing.", NiceTimeSpanMs(FinalizeBuildPartTimer.GetElapsedTimeMs()), Needs.size()); @@ -3304,7 +3349,7 @@ namespace { if (CreateBuild && !AbortFlag) { Stopwatch FinalizeBuildTimer; - Storage.FinalizeBuild(BuildId); + Storage.BuildStorage->FinalizeBuild(BuildId); ZEN_CONSOLE("FinalizeBuild took {}", NiceTimeSpanMs(FinalizeBuildTimer.GetElapsedTimeMs())); } @@ -3321,7 +3366,13 @@ namespace { { const CbObject BlockMetaData = BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]); - Storage.PutBlockMetadata(BuildId, BlockHash, BlockMetaData); + if (Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBlobMetadatas(BuildId, + std::vector<IoHash>({BlockHash}), + std::vector<CbObject>({BlockMetaData})); + } + Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData); UploadStats.BlocksBytes += BlockMetaData.GetSize(); NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; UploadBlockMetadataCount++; @@ -3340,7 +3391,7 @@ namespace { DownloadStatistics ValidateDownloadStats; if (PostUploadVerify && !AbortFlag) { - ValidateBuildPart(Storage, BuildId, BuildPartId, BuildPartName, ValidateStats, ValidateDownloadStats); + ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, BuildPartName, ValidateStats, ValidateDownloadStats); } ZEN_CONSOLE_VERBOSE( @@ -3570,7 +3621,7 @@ namespace { ValidateInfo); - Storage.PutBuildPartStats( + Storage.BuildStorage->PutBuildPartStats( BuildId, BuildPartId, {{"totalSize", double(LocalFolderScanStats.FoundFileByteCount.load())}, @@ -3597,7 +3648,7 @@ namespace { ProgressBar ProgressBar(UsePlainProgress); - WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // + WorkerThreadPool& VerifyPool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst); ParallellWork Work(AbortFlag); @@ -3873,6 +3924,22 @@ namespace { return ChunkTargetPtrs; }; + uint64_t GetChunkWriteCount(std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + const ChunkedContentLookup& Lookup, + uint32_t ChunkIndex) + { + uint64_t WriteCount = 0; + std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkSources = GetChunkSequenceLocations(Lookup, ChunkIndex); + for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources) + { + if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0) + { + WriteCount++; + } + } + return WriteCount; + }; + void FinalizeChunkSequence(const std::filesystem::path& TargetFolder, const IoHash& SequenceRawHash) { ZEN_TRACE_CPU("FinalizeChunkSequence"); @@ -3892,8 +3959,39 @@ namespace { } } + void VerifySequence(const std::filesystem::path& TargetFolder, + const ChunkedFolderContent& RemoteContent, + const ChunkedContentLookup& Lookup, + uint32_t RemoteSequenceIndex) + { + ZEN_TRACE_CPU("VerifySequence"); + const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + { + ZEN_TRACE_CPU("HashSequence"); + const std::uint32_t RemotePathIndex = Lookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; + const uint64_t ExpectedSize = RemoteContent.RawSizes[RemotePathIndex]; + IoBuffer VerifyBuffer = IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash)); + const uint64_t VerifySize = VerifyBuffer.GetSize(); + if (VerifySize != ExpectedSize) + { + throw std::runtime_error(fmt::format("Written chunk sequence {} size {} does not match expected size {}", + SequenceRawHash, + VerifySize, + ExpectedSize)); + } + ZEN_TRACE_CPU("HashSequence"); + const IoHash VerifyChunkHash = IoHash::HashBuffer(std::move(VerifyBuffer)); + if (VerifyChunkHash != SequenceRawHash) + { + throw std::runtime_error( + fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash)); + } + } + } + void VerifyAndCompleteChunkSequencesAsync(const std::filesystem::path& TargetFolder, const ChunkedFolderContent& RemoteContent, + const ChunkedContentLookup& Lookup, std::span<const uint32_t> RemoteSequenceIndexes, ParallellWork& Work, WorkerThreadPool& VerifyPool) @@ -3908,40 +4006,24 @@ namespace { const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset]; Work.ScheduleWork( VerifyPool, - [&RemoteContent, TargetFolder, RemoteSequenceIndex](std::atomic<bool>&) { + [&RemoteContent, &Lookup, TargetFolder, RemoteSequenceIndex](std::atomic<bool>&) { if (!AbortFlag) { ZEN_TRACE_CPU("VerifyAndCompleteChunkSequenceAsync"); - const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + VerifySequence(TargetFolder, RemoteContent, Lookup, RemoteSequenceIndex); + if (!AbortFlag) { - ZEN_TRACE_CPU("HashSequence"); - const IoHash VerifyChunkHash = IoHash::HashBuffer( - IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash))); - if (VerifyChunkHash != SequenceRawHash) - { - throw std::runtime_error(fmt::format("Written chunk sequence {} hash does not match expected hash {}", - VerifyChunkHash, - SequenceRawHash)); - } + const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + FinalizeChunkSequence(TargetFolder, SequenceRawHash); } - FinalizeChunkSequence(TargetFolder, SequenceRawHash); } }, Work.DefaultErrorFunction()); } const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0]; + VerifySequence(TargetFolder, RemoteContent, Lookup, RemoteSequenceIndex); const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - { - ZEN_TRACE_CPU("HashSequence"); - const IoHash VerifyChunkHash = - IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash))); - if (VerifyChunkHash != SequenceRawHash) - { - throw std::runtime_error( - fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash)); - } - } FinalizeChunkSequence(TargetFolder, SequenceRawHash); } @@ -3985,8 +4067,7 @@ namespace { const BlockWriteOps& Ops, ParallellWork& Work, WorkerThreadPool& VerifyPool, - DiskStatistics& DiskStats, - WriteChunkStatistics& WriteChunkStats) + DiskStatistics& DiskStats) { ZEN_TRACE_CPU("WriteBlockChunkOps"); { @@ -4017,12 +4098,6 @@ namespace { FileOffset, RemoteContent.RawSizes[PathIndex]); } - WriteChunkStats.ChunkCountWritten += gsl::narrow<uint32_t>(Ops.ChunkBuffers.size()); - WriteChunkStats.ChunkBytesWritten += - std::accumulate(Ops.ChunkBuffers.begin(), - Ops.ChunkBuffers.end(), - uint64_t(0), - [](uint64_t Current, const CompositeBuffer& Buffer) -> uint64_t { return Current + Buffer.GetSize(); }); } if (!AbortFlag) { @@ -4036,7 +4111,7 @@ namespace { CompletedChunkSequences.push_back(RemoteSequenceIndex); } } - VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, CompletedChunkSequences, Work, VerifyPool); + VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, Lookup, CompletedChunkSequences, Work, VerifyPool); } } @@ -4162,8 +4237,7 @@ namespace { CompositeBuffer&& BlockBuffer, const ChunkedContentLookup& Lookup, std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, - DiskStatistics& DiskStats, - WriteChunkStatistics& WriteChunkStats) + DiskStatistics& DiskStats) { ZEN_TRACE_CPU("WriteBlockToDisk"); @@ -4197,8 +4271,7 @@ namespace { Ops, Work, VerifyPool, - DiskStats, - WriteChunkStats); + DiskStats); return true; } return false; @@ -4222,8 +4295,7 @@ namespace { Ops, Work, VerifyPool, - DiskStats, - WriteChunkStats); + DiskStats); return true; } return false; @@ -4240,8 +4312,7 @@ namespace { uint32_t LastIncludedBlockChunkIndex, const ChunkedContentLookup& Lookup, std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, - DiskStatistics& DiskStats, - WriteChunkStatistics& WriteChunkStats) + DiskStatistics& DiskStats) { ZEN_TRACE_CPU("WritePartialBlockToDisk"); @@ -4267,8 +4338,7 @@ namespace { Ops, Work, VerifyPool, - DiskStats, - WriteChunkStats); + DiskStats); return true; } else @@ -4355,8 +4425,7 @@ namespace { void StreamDecompress(const std::filesystem::path& CacheFolderPath, const IoHash& SequenceRawHash, CompositeBuffer&& CompressedPart, - DiskStatistics& DiskStats, - WriteChunkStatistics& WriteChunkStats) + DiskStatistics& DiskStats) { ZEN_TRACE_CPU("StreamDecompress"); const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash); @@ -4390,7 +4459,6 @@ namespace { DiskStats.ReadByteCount += SourceSize; if (!AbortFlag) { - WriteChunkStats.ChunkBytesWritten += RangeBuffer.GetSize(); DecompressedTemp.Write(RangeBuffer, Offset); for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) { @@ -4424,7 +4492,7 @@ namespace { throw std::runtime_error( fmt::format("Failed moving temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message())); } - WriteChunkStats.ChunkCountWritten++; + // WriteChunkStats.ChunkCountWritten++; } bool WriteCompressedChunk(const std::filesystem::path& TargetFolder, @@ -4433,8 +4501,7 @@ namespace { const IoHash& ChunkHash, const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, IoBuffer&& CompressedPart, - DiskStatistics& DiskStats, - WriteChunkStatistics& WriteChunkStats) + DiskStatistics& DiskStats) { auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end()); @@ -4444,7 +4511,7 @@ namespace { { const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex; const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]; - StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), DiskStats, WriteChunkStats); + StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), DiskStats); } else { @@ -4459,8 +4526,6 @@ namespace { ChunkTargetPtrs, CompositeBuffer(std::move(Chunk)), OpenFileCache); - WriteChunkStats.ChunkCountWritten++; - WriteChunkStats.ChunkBytesWritten += ChunkRawSize; return true; } } @@ -4479,8 +4544,7 @@ namespace { std::atomic<uint64_t>& WritePartsComplete, const uint64_t TotalPartWriteCount, FilteredRate& FilteredWrittenBytesPerSecond, - DiskStatistics& DiskStats, - WriteChunkStatistics& WriteChunkStats) + DiskStatistics& DiskStats) { ZEN_TRACE_CPU("AsyncWriteDownloadedChunk"); @@ -4527,7 +4591,7 @@ namespace { } Work.ScheduleWork( - WritePool, // GetSyncWorkerPool(),// + WritePool, [&, SequenceIndexChunksLeftToWriteCounters, CompressedChunkPath, @@ -4565,8 +4629,7 @@ namespace { ChunkHash, ChunkTargetPtrs, std::move(CompressedPart), - DiskStats, - WriteChunkStats); + DiskStats); if (!AbortFlag) { WritePartsComplete++; @@ -4581,7 +4644,12 @@ namespace { CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); if (NeedHashVerify) { - VerifyAndCompleteChunkSequencesAsync(TargetFolder, RemoteContent, CompletedSequences, Work, WritePool); + VerifyAndCompleteChunkSequencesAsync(TargetFolder, + RemoteContent, + RemoteLookup, + CompletedSequences, + Work, + WritePool); } else { @@ -4593,7 +4661,7 @@ namespace { Work.DefaultErrorFunction()); }; - void UpdateFolder(BuildStorage& Storage, + void UpdateFolder(StorageInstance& Storage, const Oid& BuildId, const std::filesystem::path& Path, const std::uint64_t LargeAttachmentSize, @@ -4667,8 +4735,8 @@ namespace { if (SequenceSize == CacheDirContent.FileSizes[Index]) { CachedSequenceHashesFound.insert({FileHash, SequenceIndex}); - CacheMappingStats.CacheSequenceHashesCount += SequenceSize; - CacheMappingStats.CacheSequenceHashesByteCount++; + CacheMappingStats.CacheSequenceHashesCount++; + CacheMappingStats.CacheSequenceHashesByteCount += SequenceSize; continue; } } @@ -4869,21 +4937,17 @@ namespace { NiceBytes(CacheMappingStats.LocalChunkMatchingRemoteByteCount)); } - uint32_t ChunkCountToWrite = 0; + uint64_t BytesToWrite = 0; + for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++) { - if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) + uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex); + if (ChunkWriteCount > 0) { - ChunkCountToWrite++; - } - else - { - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = - GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex); - if (!ChunkTargetPtrs.empty()) + BytesToWrite += RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] * ChunkWriteCount; + if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) { RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex] = true; - ChunkCountToWrite++; } } } @@ -4900,8 +4964,8 @@ namespace { FilteredRate FilteredDownloadedBytesPerSecond; FilteredRate FilteredWrittenBytesPerSecond; - WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // - WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // + WorkerThreadPool& NetworkPool = SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst); + WorkerThreadPool& WritePool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst); ProgressBar WriteProgressBar(UsePlainProgress); ParallellWork Work(AbortFlag); @@ -4922,7 +4986,7 @@ namespace { const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second; if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) { - ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash); + ZEN_CONSOLE_VERBOSE("Skipping chunk {} due to cache reuse", ChunkHash); continue; } bool NeedsCopy = true; @@ -4933,7 +4997,7 @@ namespace { if (ChunkTargetPtrs.empty()) { - ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash); + ZEN_CONSOLE_VERBOSE("Skipping chunk {} due to cache reuse", ChunkHash); } else { @@ -5025,6 +5089,10 @@ namespace { uint32_t CurrentOffset = gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + const uint64_t TotalBlockSize = std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), + BlockDescription.ChunkCompressedLengths.end(), + std::uint64_t(CurrentOffset)); + BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex}; while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() && ChunkBlockIndex < BlockDescription.ChunkRawHashes.size()) @@ -5064,6 +5132,7 @@ namespace { } ZEN_ASSERT(!BlockRanges.empty()); + std::vector<BlockRangeDescriptor> CollapsedBlockRanges; auto It = BlockRanges.begin(); CollapsedBlockRanges.push_back(*It++); @@ -5085,10 +5154,87 @@ namespace { ++It; } - TotalRequestCount += CollapsedBlockRanges.size(); - TotalPartWriteCount += CollapsedBlockRanges.size(); + const std::uint64_t WantedSize = std::accumulate( + CollapsedBlockRanges.begin(), + CollapsedBlockRanges.end(), + uint64_t(0), + [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); + ZEN_ASSERT(WantedSize <= TotalBlockSize); + if (WantedSize > ((TotalBlockSize * 95) / 100)) + { + ZEN_CONSOLE_VERBOSE("Using more than 95% ({}) of block {} ({}), requesting full block", + NiceBytes(WantedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize)); + TotalRequestCount++; + TotalPartWriteCount++; + + FullBlockWorks.push_back(BlockIndex); + } + else if ((WantedSize > ((TotalBlockSize * 9) / 10)) && CollapsedBlockRanges.size() > 1) + { + ZEN_CONSOLE_VERBOSE("Using more than 90% ({}) of block {} ({}) using {} requests, requesting full block", + NiceBytes(WantedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + CollapsedBlockRanges.size()); + TotalRequestCount++; + TotalPartWriteCount++; + + FullBlockWorks.push_back(BlockIndex); + } + else if ((WantedSize > ((TotalBlockSize * 8) / 10)) && (CollapsedBlockRanges.size() > 16)) + { + ZEN_CONSOLE_VERBOSE("Using more than 80% ({}) of block {} ({}) using {} requests, requesting full block", + NiceBytes(WantedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + CollapsedBlockRanges.size()); + TotalRequestCount++; + TotalPartWriteCount++; + + FullBlockWorks.push_back(BlockIndex); + } + else if ((WantedSize > ((TotalBlockSize * 7) / 10)) && (CollapsedBlockRanges.size() > 48)) + { + ZEN_CONSOLE_VERBOSE("Using more than 70% ({}) of block {} ({}) using {} requests, requesting full block", + NiceBytes(WantedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + CollapsedBlockRanges.size()); + TotalRequestCount++; + TotalPartWriteCount++; + + FullBlockWorks.push_back(BlockIndex); + } + else if ((WantedSize > ((TotalBlockSize * 6) / 10)) && (CollapsedBlockRanges.size() > 64)) + { + ZEN_CONSOLE_VERBOSE("Using more than 60% ({}) of block {} ({}) using {} requests, requesting full block", + NiceBytes(WantedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + CollapsedBlockRanges.size()); + TotalRequestCount++; + TotalPartWriteCount++; + + FullBlockWorks.push_back(BlockIndex); + } + else + { + if (WantedSize > ((TotalBlockSize * 5) / 10)) + { + ZEN_CONSOLE_VERBOSE("Using {}% ({}) of block {} ({}) using {} requests, requesting partial block", + (WantedSize * 100) / TotalBlockSize, + NiceBytes(WantedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + CollapsedBlockRanges.size()); + } + TotalRequestCount += CollapsedBlockRanges.size(); + TotalPartWriteCount += CollapsedBlockRanges.size(); - BlockRangeWorks.insert(BlockRangeWorks.end(), CollapsedBlockRanges.begin(), CollapsedBlockRanges.end()); + BlockRangeWorks.insert(BlockRangeWorks.end(), CollapsedBlockRanges.begin(), CollapsedBlockRanges.end()); + } } else { @@ -5101,10 +5247,69 @@ namespace { } else { - ZEN_DEBUG("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash); + ZEN_CONSOLE_VERBOSE("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash); } } + struct BlobsExistsResult + { + tsl::robin_set<IoHash> ExistingBlobs; + uint64_t ElapsedTimeMs = 0; + }; + + BlobsExistsResult ExistsResult; + + if (Storage.BuildCacheStorage) + { + ZEN_TRACE_CPU("BlobCacheExistCheck"); + Stopwatch Timer; + + tsl::robin_set<IoHash> BlobHashesSet; + + BlobHashesSet.reserve(LooseChunkHashWorks.size() + FullBlockWorks.size()); + for (LooseChunkHashWorkData& LooseChunkHashWork : LooseChunkHashWorks) + { + BlobHashesSet.insert(RemoteContent.ChunkedContent.ChunkHashes[LooseChunkHashWork.RemoteChunkIndex]); + } + for (const BlockRangeDescriptor& BlockRange : BlockRangeWorks) + { + const uint32_t BlockIndex = BlockRange.BlockIndex; + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + BlobHashesSet.insert(BlockDescription.BlockHash); + } + for (uint32_t BlockIndex : FullBlockWorks) + { + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + BlobHashesSet.insert(BlockDescription.BlockHash); + } + + if (!BlobHashesSet.empty()) + { + const std::vector<IoHash> BlobHashes(BlobHashesSet.begin(), BlobHashesSet.end()); + const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult = + Storage.BuildCacheStorage->BlobsExists(BuildId, BlobHashes); + + if (CacheExistsResult.size() == BlobHashes.size()) + { + ExistsResult.ExistingBlobs.reserve(CacheExistsResult.size()); + for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++) + { + if (CacheExistsResult[BlobIndex].HasBody) + { + ExistsResult.ExistingBlobs.insert(BlobHashes[BlobIndex]); + } + } + } + ExistsResult.ElapsedTimeMs = Timer.GetElapsedTimeMs(); + if (!ExistsResult.ExistingBlobs.empty()) + { + ZEN_CONSOLE("Found {} out of {} needed blobs in remote cache in {}", + ExistsResult.ExistingBlobs.size(), + BlobHashes.size(), + NiceTimeSpanMs(ExistsResult.ElapsedTimeMs)); + } + } + } for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++) { if (AbortFlag) @@ -5119,7 +5324,7 @@ namespace { const uint32_t RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex; Work.ScheduleWork( - WritePool, // NetworkPool, // GetSyncWorkerPool(),// + WritePool, [&, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) mutable { if (!AbortFlag) { @@ -5152,151 +5357,202 @@ namespace { } } } - if (!ExistingCompressedChunkPath.empty()) + if (!AbortFlag) + { - Work.ScheduleWork( - WritePool, // WritePool, GetSyncWorkerPool() - [&Path, - &RemoteContent, - &RemoteLookup, - &CacheFolderPath, - &SequenceIndexChunksLeftToWriteCounters, - &Work, - &WritePool, - &DiskStats, - &WriteChunkStats, - &WritePartsComplete, - &TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - RemoteChunkIndex, - ChunkTargetPtrs, - CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded"); + if (!ExistingCompressedChunkPath.empty()) + { + Work.ScheduleWork( + WritePool, + [&Path, + &RemoteContent, + &RemoteLookup, + &CacheFolderPath, + &SequenceIndexChunksLeftToWriteCounters, + &Work, + &WritePool, + &DiskStats, + &WriteChunkStats, + &WritePartsComplete, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + RemoteChunkIndex, + ChunkTargetPtrs, + CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded"); - FilteredWrittenBytesPerSecond.Start(); + FilteredWrittenBytesPerSecond.Start(); - const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); - if (!CompressedPart) - { - throw std::runtime_error(fmt::format("Could not open dowloaded compressed chunk {} from {}", - ChunkHash, - CompressedChunkPath)); - } + IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); + if (!CompressedPart) + { + throw std::runtime_error( + fmt::format("Could not open dowloaded compressed chunk {} from {}", + ChunkHash, + CompressedChunkPath)); + } - std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName; - bool NeedHashVerify = WriteCompressedChunk(TargetFolder, - RemoteContent, - RemoteLookup, - ChunkHash, - ChunkTargetPtrs, - std::move(CompressedPart), - DiskStats, - WriteChunkStats); - WriteChunkStats.ChunkCountWritten++; - WriteChunkStats.ChunkBytesWritten += - RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex]; - WritePartsComplete++; + std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName; + bool NeedHashVerify = WriteCompressedChunk(TargetFolder, + RemoteContent, + RemoteLookup, + ChunkHash, + ChunkTargetPtrs, + std::move(CompressedPart), + DiskStats); + WritePartsComplete++; - if (!AbortFlag) - { - if (WritePartsComplete == TotalPartWriteCount) + if (!AbortFlag) { - FilteredWrittenBytesPerSecond.Stop(); - } + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } - std::filesystem::remove(CompressedChunkPath); + std::filesystem::remove(CompressedChunkPath); - std::vector<uint32_t> CompletedSequences = - CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); - if (NeedHashVerify) + std::vector<uint32_t> CompletedSequences = + CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); + if (NeedHashVerify) + { + VerifyAndCompleteChunkSequencesAsync(TargetFolder, + RemoteContent, + RemoteLookup, + CompletedSequences, + Work, + WritePool); + } + else + { + FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences); + } + } + } + }, + Work.DefaultErrorFunction()); + } + else + { + Work.ScheduleWork( + NetworkPool, + [&Path, + &Storage, + BuildId, + &RemoteContent, + &RemoteLookup, + &ExistsResult, + &SequenceIndexChunksLeftToWriteCounters, + &Work, + &WritePool, + &NetworkPool, + &DiskStats, + &WriteChunkStats, + &WritePartsComplete, + TotalPartWriteCount, + TotalRequestCount, + &FilteredDownloadedBytesPerSecond, + &FilteredWrittenBytesPerSecond, + LargeAttachmentSize, + PreferredMultipartChunkSize, + RemoteChunkIndex, + ChunkTargetPtrs, + &DownloadStats](std::atomic<bool>&) mutable { + if (!AbortFlag) + { + FilteredDownloadedBytesPerSecond.Start(); + const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) { - VerifyAndCompleteChunkSequencesAsync(TargetFolder, - RemoteContent, - CompletedSequences, - Work, - WritePool); + ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk"); + DownloadLargeBlob(*Storage.BuildStorage, + Path / ZenTempDownloadFolderName, + BuildId, + ChunkHash, + PreferredMultipartChunkSize, + Work, + NetworkPool, + DownloadStats, + [&, RemoteChunkIndex, ChunkTargetPtrs](IoBuffer&& Payload) mutable { + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + if (!AbortFlag) + { + AsyncWriteDownloadedChunk( + Path, + RemoteContent, + RemoteLookup, + RemoteChunkIndex, + std::move(ChunkTargetPtrs), + Work, + WritePool, + std::move(Payload), + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + TotalPartWriteCount, + FilteredWrittenBytesPerSecond, + DiskStats); + } + }); } else { - FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences); + ZEN_TRACE_CPU("UpdateFolder_GetChunk"); + IoBuffer BuildBlob; + if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash)) + { + BuildBlob = Storage.BuildCacheStorage->GetBuildBlob(BuildId, ChunkHash); + } + if (!BuildBlob) + { + BuildBlob = Storage.BuildStorage->GetBuildBlob(BuildId, ChunkHash); + if (BuildBlob && Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBuildBlob( + BuildId, + ChunkHash, + BuildBlob.GetContentType(), + CompositeBuffer(SharedBuffer(BuildBlob))); + } + } + if (!BuildBlob) + { + throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); + } + if (!AbortFlag) + { + uint64_t BlobSize = BuildBlob.GetSize(); + DownloadStats.DownloadedChunkCount++; + DownloadStats.DownloadedChunkByteCount += BlobSize; + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + AsyncWriteDownloadedChunk(Path, + RemoteContent, + RemoteLookup, + RemoteChunkIndex, + std::move(ChunkTargetPtrs), + Work, + WritePool, + std::move(BuildBlob), + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + TotalPartWriteCount, + FilteredWrittenBytesPerSecond, + DiskStats); + } } } - } - }, - Work.DefaultErrorFunction()); - } - else - { - FilteredDownloadedBytesPerSecond.Start(); - const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) - { - ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk"); - DownloadLargeBlob(Storage, - Path / ZenTempDownloadFolderName, - BuildId, - ChunkHash, - PreferredMultipartChunkSize, - Work, - NetworkPool, - DownloadStats, - [&, RemoteChunkIndex, ChunkTargetPtrs](IoBuffer&& Payload) mutable { - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - AsyncWriteDownloadedChunk(Path, - RemoteContent, - RemoteLookup, - RemoteChunkIndex, - std::move(ChunkTargetPtrs), - Work, - WritePool, - std::move(Payload), - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - TotalPartWriteCount, - FilteredWrittenBytesPerSecond, - DiskStats, - WriteChunkStats); - }); - } - else - { - ZEN_TRACE_CPU("UpdateFolder_GetChunk"); - - IoBuffer BuildBlob = Storage.GetBuildBlob(BuildId, ChunkHash); - if (!BuildBlob) - { - throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); - } - uint64_t BlobSize = BuildBlob.GetSize(); - DownloadStats.DownloadedChunkCount++; - DownloadStats.DownloadedChunkByteCount += BlobSize; - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - AsyncWriteDownloadedChunk(Path, - RemoteContent, - RemoteLookup, - RemoteChunkIndex, - std::move(ChunkTargetPtrs), - Work, - WritePool, - std::move(BuildBlob), - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - TotalPartWriteCount, - FilteredWrittenBytesPerSecond, - DiskStats, - WriteChunkStats); + }, + Work.DefaultErrorFunction()); } } } @@ -5312,7 +5568,7 @@ namespace { } Work.ScheduleWork( - WritePool, // GetSyncWorkerPool(),// + WritePool, [&, CopyDataIndex](std::atomic<bool>&) { if (!AbortFlag) { @@ -5439,16 +5695,6 @@ namespace { ChunkSource, Op.Target->Offset, RemoteContent.RawSizes[RemotePathIndex]); - for (size_t WrittenOpIndex = WriteOpIndex; WrittenOpIndex < WriteOpIndex + WriteCount; WrittenOpIndex++) - { - const WriteOp& WrittenOp = WriteOps[WrittenOpIndex]; - if (ChunkIndexesWritten.insert(WrittenOp.ChunkIndex).second) - { - WriteChunkStats.ChunkCountWritten++; - WriteChunkStats.ChunkBytesWritten += - RemoteContent.ChunkedContent.ChunkRawSizes[WrittenOp.ChunkIndex]; - } - } CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes? @@ -5469,10 +5715,13 @@ namespace { } VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, + RemoteLookup, CompletedChunkSequences, Work, WritePool); - ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]); + ZEN_CONSOLE_VERBOSE("Copied {} from {}", + NiceBytes(CacheLocalFileBytesRead), + LocalContent.Paths[LocalPathIndex]); } WritePartsComplete++; if (WritePartsComplete == TotalPartWriteCount) @@ -5492,7 +5741,7 @@ namespace { } Work.ScheduleWork( - WritePool, // GetSyncWorkerPool(), // WritePool, + WritePool, [&, BlockIndex](std::atomic<bool>&) mutable { if (!AbortFlag) { @@ -5509,27 +5758,29 @@ namespace { fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath)); } - if (!WriteBlockToDisk(CacheFolderPath, - RemoteContent, - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - WritePool, - CompositeBuffer(std::move(BlockBuffer)), - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags, - DiskStats, - WriteChunkStats)) - { - std::error_code DummyEc; - std::filesystem::remove(BlockChunkPath, DummyEc); - throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); - } - WritePartsComplete++; - std::filesystem::remove(BlockChunkPath); - if (WritePartsComplete == TotalPartWriteCount) + if (!AbortFlag) { - FilteredWrittenBytesPerSecond.Stop(); + if (!WriteBlockToDisk(CacheFolderPath, + RemoteContent, + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + WritePool, + CompositeBuffer(std::move(BlockBuffer)), + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags, + DiskStats)) + { + std::error_code DummyEc; + std::filesystem::remove(BlockChunkPath, DummyEc); + throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); + } + WritePartsComplete++; + std::filesystem::remove(BlockChunkPath); + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } } } }, @@ -5546,7 +5797,7 @@ namespace { ZEN_ASSERT(BlockRange.BlockIndex != (uint32_t)-1); const uint32_t BlockIndex = BlockRange.BlockIndex; Work.ScheduleWork( - NetworkPool, // NetworkPool, // GetSyncWorkerPool() + NetworkPool, [&, BlockIndex, BlockRange](std::atomic<bool>&) { if (!AbortFlag) { @@ -5555,131 +5806,148 @@ namespace { const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; FilteredDownloadedBytesPerSecond.Start(); - IoBuffer BlockBuffer = - Storage.GetBuildBlob(BuildId, BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); + IoBuffer BlockBuffer; + if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash)) + { + BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); + } if (!BlockBuffer) { - throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); + BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); } - uint64_t BlockSize = BlockBuffer.GetSize(); - DownloadStats.DownloadedBlockCount++; - DownloadStats.DownloadedBlockByteCount += BlockSize; - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + if (!BlockBuffer) { - FilteredDownloadedBytesPerSecond.Stop(); + throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); } - - std::filesystem::path BlockChunkPath; - - // Check if the dowloaded block is file based and we can move it directly without rewriting it + if (!AbortFlag) { - IoBufferFileReference FileRef; - if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && - (FileRef.FileChunkSize == BlockSize)) + uint64_t BlockSize = BlockBuffer.GetSize(); + DownloadStats.DownloadedBlockCount++; + DownloadStats.DownloadedBlockByteCount += BlockSize; + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) { - ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); + FilteredDownloadedBytesPerSecond.Stop(); + } - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) + std::filesystem::path BlockChunkPath; + + // Check if the dowloaded block is file based and we can move it directly without rewriting it + { + IoBufferFileReference FileRef; + if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && + (FileRef.FileChunkSize == BlockSize)) { - BlockBuffer.SetDeleteOnClose(false); - BlockBuffer = {}; - BlockChunkPath = Path / ZenTempBlockFolderName / - fmt::format("{}_{:x}_{:x}", - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeLength); - std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec); - if (Ec) + ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); + + std::error_code Ec; + std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); + if (!Ec) { - BlockChunkPath = std::filesystem::path{}; + BlockBuffer.SetDeleteOnClose(false); + BlockBuffer = {}; + BlockChunkPath = Path / ZenTempBlockFolderName / + fmt::format("{}_{:x}_{:x}", + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); + std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec); + if (Ec) + { + BlockChunkPath = std::filesystem::path{}; - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); - BlockBuffer.SetDeleteOnClose(true); + // Re-open the temp file again + BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); + BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); + BlockBuffer.SetDeleteOnClose(true); + } } } } - } - if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) - { - ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); - // Could not be moved and rather large, lets store it on disk - BlockChunkPath = - Path / ZenTempBlockFolderName / - fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); - TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); - BlockBuffer = {}; - } + if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) + { + ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); + // Could not be moved and rather large, lets store it on disk + BlockChunkPath = Path / ZenTempBlockFolderName / + fmt::format("{}_{:x}_{:x}", + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); + TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); + BlockBuffer = {}; + } - if (!AbortFlag) - { - Work.ScheduleWork( - WritePool, // WritePool, // GetSyncWorkerPool(), - [&, BlockIndex, BlockRange, BlockChunkPath, BlockPartialBuffer = std::move(BlockBuffer)]( - std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_WritePartialBlock"); + if (!AbortFlag) + { + Work.ScheduleWork( + WritePool, + [&, BlockIndex, BlockRange, BlockChunkPath, BlockPartialBuffer = std::move(BlockBuffer)]( + std::atomic<bool>&) mutable { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_WritePartialBlock"); - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - if (BlockChunkPath.empty()) - { - ZEN_ASSERT(BlockPartialBuffer); - } - else - { - ZEN_ASSERT(!BlockPartialBuffer); - BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockPartialBuffer) + if (BlockChunkPath.empty()) { - throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}", - BlockDescription.BlockHash, - BlockChunkPath)); + ZEN_ASSERT(BlockPartialBuffer); + } + else + { + ZEN_ASSERT(!BlockPartialBuffer); + BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockPartialBuffer) + { + throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}", + BlockDescription.BlockHash, + BlockChunkPath)); + } } - } - FilteredWrittenBytesPerSecond.Start(); - - if (!WritePartialBlockToDisk( - CacheFolderPath, - RemoteContent, - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - WritePool, - CompositeBuffer(std::move(BlockPartialBuffer)), - BlockRange.ChunkBlockIndexStart, - BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags, - DiskStats, - WriteChunkStats)) - { - std::error_code DummyEc; - std::filesystem::remove(BlockChunkPath, DummyEc); - throw std::runtime_error( - fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); - } + FilteredWrittenBytesPerSecond.Start(); + + if (!WritePartialBlockToDisk( + CacheFolderPath, + RemoteContent, + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + WritePool, + CompositeBuffer(std::move(BlockPartialBuffer)), + BlockRange.ChunkBlockIndexStart, + BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags, + DiskStats)) + { + std::error_code DummyEc; + std::filesystem::remove(BlockChunkPath, DummyEc); + throw std::runtime_error( + fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); + } - if (!BlockChunkPath.empty()) - { - std::filesystem::remove(BlockChunkPath); - } + if (!BlockChunkPath.empty()) + { + std::filesystem::remove(BlockChunkPath); + } - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } } - } - }, - Work.DefaultErrorFunction()); + }, + Work.DefaultErrorFunction()); + } } } }, @@ -5693,7 +5961,7 @@ namespace { break; } Work.ScheduleWork( - NetworkPool, // GetSyncWorkerPool(), // NetworkPool, + NetworkPool, [&, BlockIndex](std::atomic<bool>&) { if (!AbortFlag) { @@ -5702,133 +5970,152 @@ namespace { const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; FilteredDownloadedBytesPerSecond.Start(); - IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescription.BlockHash); + + IoBuffer BlockBuffer; + if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash)) + { + BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash); + } if (!BlockBuffer) { - throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); + BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash); + if (BlockBuffer && Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBuildBlob(BuildId, + BlockDescription.BlockHash, + BlockBuffer.GetContentType(), + CompositeBuffer(SharedBuffer(BlockBuffer))); + } } - uint64_t BlockSize = BlockBuffer.GetSize(); - DownloadStats.DownloadedBlockCount++; - DownloadStats.DownloadedBlockByteCount += BlockSize; - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + if (!BlockBuffer) { - FilteredDownloadedBytesPerSecond.Stop(); + throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); } + if (!AbortFlag) + { + uint64_t BlockSize = BlockBuffer.GetSize(); + DownloadStats.DownloadedBlockCount++; + DownloadStats.DownloadedBlockByteCount += BlockSize; + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } - std::filesystem::path BlockChunkPath; + std::filesystem::path BlockChunkPath; - // Check if the dowloaded block is file based and we can move it directly without rewriting it - { - IoBufferFileReference FileRef; - if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && - (FileRef.FileChunkSize == BlockSize)) + // Check if the dowloaded block is file based and we can move it directly without rewriting it { - ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) + IoBufferFileReference FileRef; + if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && + (FileRef.FileChunkSize == BlockSize)) { - BlockBuffer.SetDeleteOnClose(false); - BlockBuffer = {}; - BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); - std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec); - if (Ec) + ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); + std::error_code Ec; + std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); + if (!Ec) { - BlockChunkPath = std::filesystem::path{}; + BlockBuffer.SetDeleteOnClose(false); + BlockBuffer = {}; + BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); + std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec); + if (Ec) + { + BlockChunkPath = std::filesystem::path{}; - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); - BlockBuffer.SetDeleteOnClose(true); + // Re-open the temp file again + BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); + BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); + BlockBuffer.SetDeleteOnClose(true); + } } } } - } - if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) - { - ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); - // Could not be moved and rather large, lets store it on disk - BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); - TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); - BlockBuffer = {}; - } + if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) + { + ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); + // Could not be moved and rather large, lets store it on disk + BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); + TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); + BlockBuffer = {}; + } - if (!AbortFlag) - { - Work.ScheduleWork( - WritePool, // WritePool, GetSyncWorkerPool() - [&Work, - &WritePool, - &RemoteContent, - &RemoteLookup, - CacheFolderPath, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - BlockIndex, - &BlockDescriptions, - &WriteChunkStats, - &DiskStats, - &WritePartsComplete, - &TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - BlockChunkPath, - BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock"); + if (!AbortFlag) + { + Work.ScheduleWork( + WritePool, + [&Work, + &WritePool, + &RemoteContent, + &RemoteLookup, + CacheFolderPath, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + BlockIndex, + &BlockDescriptions, + &WriteChunkStats, + &DiskStats, + &WritePartsComplete, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + BlockChunkPath, + BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock"); - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - if (BlockChunkPath.empty()) - { - ZEN_ASSERT(BlockBuffer); - } - else - { - ZEN_ASSERT(!BlockBuffer); - BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockBuffer) + if (BlockChunkPath.empty()) { - throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}", - BlockDescription.BlockHash, - BlockChunkPath)); + ZEN_ASSERT(BlockBuffer); + } + else + { + ZEN_ASSERT(!BlockBuffer); + BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}", + BlockDescription.BlockHash, + BlockChunkPath)); + } } - } - FilteredWrittenBytesPerSecond.Start(); - if (!WriteBlockToDisk(CacheFolderPath, - RemoteContent, - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - WritePool, - CompositeBuffer(std::move(BlockBuffer)), - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags, - DiskStats, - WriteChunkStats)) - { - std::error_code DummyEc; - std::filesystem::remove(BlockChunkPath, DummyEc); - throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); - } + FilteredWrittenBytesPerSecond.Start(); + if (!WriteBlockToDisk(CacheFolderPath, + RemoteContent, + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + WritePool, + CompositeBuffer(std::move(BlockBuffer)), + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags, + DiskStats)) + { + std::error_code DummyEc; + std::filesystem::remove(BlockChunkPath, DummyEc); + throw std::runtime_error( + fmt::format("Block {} is malformed", BlockDescription.BlockHash)); + } - if (!BlockChunkPath.empty()) - { - std::filesystem::remove(BlockChunkPath); - } + if (!BlockChunkPath.empty()) + { + std::filesystem::remove(BlockChunkPath); + } - WritePartsComplete++; + WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } } - } - }, - Work.DefaultErrorFunction()); + }, + Work.DefaultErrorFunction()); + } } } }, @@ -5840,27 +6127,28 @@ namespace { Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, PendingWork); - ZEN_ASSERT(ChunkCountToWrite >= WriteChunkStats.ChunkCountWritten.load()); uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() + DownloadStats.DownloadedBlockByteCount.load() + +DownloadStats.DownloadedPartialBlockByteCount.load(); FilteredWrittenBytesPerSecond.Update(DiskStats.WriteByteCount.load()); FilteredDownloadedBytesPerSecond.Update(DownloadedBytes); - std::string Details = fmt::format("{}/{} ({} {}bits/s) downloaded. {}/{} ({} {}B/s) written.", + std::string DownloadRateString = + (DownloadStats.RequestsCompleteCount == TotalRequestCount) + ? "" + : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8)); + std::string Details = fmt::format("{}/{} ({}{}) downloaded. {}/{} ({}B/s) written.", DownloadStats.RequestsCompleteCount.load(), TotalRequestCount, NiceBytes(DownloadedBytes), - NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8), - WriteChunkStats.ChunkCountWritten.load(), - ChunkCountToWrite, + DownloadRateString, NiceBytes(DiskStats.WriteByteCount.load()), + NiceBytes(BytesToWrite), NiceNum(FilteredWrittenBytesPerSecond.GetCurrent())); - WriteProgressBar.UpdateState( - {.Task = "Writing chunks ", - .Details = Details, - .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite), - .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - WriteChunkStats.ChunkCountWritten.load())}, - false); + WriteProgressBar.UpdateState({.Task = "Writing chunks ", + .Details = Details, + .TotalCount = gsl::narrow<uint64_t>(BytesToWrite), + .RemainingCount = gsl::narrow<uint64_t>(BytesToWrite - DiskStats.WriteByteCount.load())}, + false); }); } @@ -5981,7 +6269,7 @@ namespace { ZEN_TRACE_CPU("UpdateFolder_FinalizeTree"); Stopwatch Timer; - WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // + WorkerThreadPool& WritePool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst); ProgressBar RebuildProgressBar(UsePlainProgress); ParallellWork Work(AbortFlag); @@ -6019,7 +6307,7 @@ namespace { } Work.ScheduleWork( - WritePool, // GetSyncWorkerPool(),// + WritePool, [&, BaseTargetOffset = TargetOffset, TargetCount](std::atomic<bool>&) { if (!AbortFlag) { @@ -6070,6 +6358,10 @@ namespace { TargetsComplete++; while (TargetOffset < (BaseTargetOffset + TargetCount)) { + if (AbortFlag) + { + break; + } ZEN_TRACE_CPU("FinalizeTree_Copy"); ZEN_ASSERT(Targets[TargetOffset].first == RawHash); @@ -6140,17 +6432,15 @@ namespace { std::vector<std::pair<Oid, std::string>> Result; { Stopwatch GetBuildTimer; - - std::vector<std::pair<Oid, std::string>> AvailableParts; - - CbObject BuildObject = Storage.GetBuild(BuildId); - - ZEN_CONSOLE("GetBuild took {}. Name: '{}', Payload size: {}", + CbObject BuildObject = Storage.GetBuild(BuildId); + ZEN_CONSOLE("GetBuild took {}. Name: '{}' ({}, {}), Payload size: {}", NiceTimeSpanMs(GetBuildTimer.GetElapsedTimeMs()), - BuildObject["BuildName"sv].AsString(), + BuildObject["name"sv].AsString(), + BuildObject["type"sv].AsString(), + BuildObject["Configuration"sv].AsString(), NiceBytes(BuildObject.GetSize())); - ZEN_DEBUG("Build object: {}", BuildObject); + ZEN_CONSOLE_VERBOSE("Build object: {}", BuildObject); CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); if (!PartsObject) @@ -6160,6 +6450,8 @@ namespace { OutPreferredMultipartChunkSize = BuildObject["chunkSize"sv].AsUInt64(OutPreferredMultipartChunkSize); + std::vector<std::pair<Oid, std::string>> AvailableParts; + for (CbFieldView PartView : PartsObject) { const std::string BuildPartName = std::string(PartView.GetName()); @@ -6221,7 +6513,7 @@ namespace { return Result; } - ChunkedFolderContent GetRemoteContent(BuildStorage& Storage, + ChunkedFolderContent GetRemoteContent(StorageInstance& Storage, const Oid& BuildId, const std::vector<std::pair<Oid, std::string>>& BuildParts, std::unique_ptr<ChunkingController>& OutChunkController, @@ -6234,7 +6526,7 @@ namespace { Stopwatch GetBuildPartTimer; const Oid BuildPartId = BuildParts[0].first; const std::string_view BuildPartName = BuildParts[0].second; - CbObject BuildPartManifest = Storage.GetBuildPart(BuildId, BuildPartId); + CbObject BuildPartManifest = Storage.BuildStorage->GetBuildPart(BuildId, BuildPartId); ZEN_CONSOLE("GetBuildPart {} ('{}') took {}. Payload size: {}", BuildPartId, BuildPartName, @@ -6248,7 +6540,7 @@ namespace { OutChunkController = CreateChunkingController(ChunkerName, Parameters); } - auto ParseBuildPartManifest = [](BuildStorage& Storage, + auto ParseBuildPartManifest = [](StorageInstance& Storage, const Oid& BuildId, const Oid& BuildPartId, CbObject BuildPartManifest, @@ -6274,12 +6566,103 @@ namespace { // TODO: GetBlockDescriptions for all BlockRawHashes in one go - check for local block descriptions when we cache them - Stopwatch GetBlockMetadataTimer; - OutBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockRawHashes); - ZEN_CONSOLE("GetBlockMetadata for {} took {}. Found {} blocks", - BuildPartId, - NiceTimeSpanMs(GetBlockMetadataTimer.GetElapsedTimeMs()), - OutBlockDescriptions.size()); + { + Stopwatch GetBlockMetadataTimer; + + std::vector<ChunkBlockDescription> UnorderedList; + tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockDescriptionLookup; + if (Storage.BuildCacheStorage) + { + std::vector<CbObject> CacheBlockMetadatas = Storage.BuildCacheStorage->GetBlobMetadatas(BuildId, BlockRawHashes); + UnorderedList.reserve(CacheBlockMetadatas.size()); + for (size_t CacheBlockMetadataIndex = 0; CacheBlockMetadataIndex < CacheBlockMetadatas.size(); + CacheBlockMetadataIndex++) + { + const CbObject& CacheBlockMetadata = CacheBlockMetadatas[CacheBlockMetadataIndex]; + ChunkBlockDescription Description = ParseChunkBlockDescription(CacheBlockMetadata); + if (Description.BlockHash == IoHash::Zero) + { + ZEN_WARN("Unexpected/invalid block metadata received from remote cache, skipping block"); + } + else + { + UnorderedList.emplace_back(std::move(Description)); + } + } + for (size_t DescriptionIndex = 0; DescriptionIndex < UnorderedList.size(); DescriptionIndex++) + { + const ChunkBlockDescription& Description = UnorderedList[DescriptionIndex]; + BlockDescriptionLookup.insert_or_assign(Description.BlockHash, DescriptionIndex); + } + } + + if (UnorderedList.size() < BlockRawHashes.size()) + { + std::vector<IoHash> RemainingBlockHashes; + RemainingBlockHashes.reserve(BlockRawHashes.size() - UnorderedList.size()); + for (const IoHash& BlockRawHash : BlockRawHashes) + { + if (!BlockDescriptionLookup.contains(BlockRawHash)) + { + RemainingBlockHashes.push_back(BlockRawHash); + } + } + CbObject BlockMetadatas = Storage.BuildStorage->GetBlockMetadatas(BuildId, RemainingBlockHashes); + std::vector<ChunkBlockDescription> RemainingList; + { + CbArrayView BlocksArray = BlockMetadatas["blocks"sv].AsArrayView(); + std::vector<IoHash> FoundBlockHashes; + std::vector<CbObject> FoundBlockMetadatas; + for (CbFieldView Block : BlocksArray) + { + ChunkBlockDescription Description = ParseChunkBlockDescription(Block.AsObjectView()); + + if (Description.BlockHash == IoHash::Zero) + { + ZEN_WARN("Unexpected/invalid block metadata received from remote store, skipping block"); + } + else + { + if (Storage.BuildCacheStorage) + { + UniqueBuffer MetaBuffer = UniqueBuffer::Alloc(Block.GetSize()); + Block.CopyTo(MetaBuffer.GetMutableView()); + CbObject BlockMetadata(MetaBuffer.MoveToShared()); + + FoundBlockHashes.push_back(Description.BlockHash); + FoundBlockMetadatas.push_back(BlockMetadata); + } + RemainingList.emplace_back(std::move(Description)); + } + } + if (Storage.BuildCacheStorage && !FoundBlockHashes.empty()) + { + Storage.BuildCacheStorage->PutBlobMetadatas(BuildId, FoundBlockHashes, FoundBlockMetadatas); + } + } + + for (size_t DescriptionIndex = 0; DescriptionIndex < RemainingList.size(); DescriptionIndex++) + { + const ChunkBlockDescription& Description = RemainingList[DescriptionIndex]; + BlockDescriptionLookup.insert_or_assign(Description.BlockHash, UnorderedList.size() + DescriptionIndex); + } + UnorderedList.insert(UnorderedList.end(), RemainingList.begin(), RemainingList.end()); + } + + OutBlockDescriptions.reserve(BlockDescriptionLookup.size()); + for (const IoHash& BlockHash : BlockRawHashes) + { + if (auto It = BlockDescriptionLookup.find(BlockHash); It != BlockDescriptionLookup.end()) + { + OutBlockDescriptions.push_back(std::move(UnorderedList[It->second])); + } + } + + ZEN_CONSOLE("GetBlockMetadata for {} took {}. Found {} blocks", + BuildPartId, + NiceTimeSpanMs(GetBlockMetadataTimer.GetElapsedTimeMs()), + OutBlockDescriptions.size()); + } if (OutBlockDescriptions.size() != BlockRawHashes.size()) { @@ -6292,7 +6675,8 @@ namespace { ZEN_CONSOLE("{} Attemping fallback options.", ErrorDescription); std::vector<ChunkBlockDescription> AugmentedBlockDescriptions; AugmentedBlockDescriptions.reserve(BlockRawHashes.size()); - std::vector<ChunkBlockDescription> FoundBlocks = Storage.FindBlocks(BuildId); + std::vector<ChunkBlockDescription> FoundBlocks = + ParseChunkBlockDescriptionList(Storage.BuildStorage->FindBlocks(BuildId)); for (const IoHash& BlockHash : BlockRawHashes) { @@ -6315,7 +6699,7 @@ namespace { } else { - IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockHash); + IoBuffer BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, BlockHash); if (!BlockBuffer) { throw std::runtime_error(fmt::format("Block {} could not be found", BlockHash)); @@ -6381,7 +6765,7 @@ namespace { const Oid& OverlayBuildPartId = BuildParts[PartIndex].first; const std::string& OverlayBuildPartName = BuildParts[PartIndex].second; Stopwatch GetOverlayBuildPartTimer; - CbObject OverlayBuildPartManifest = Storage.GetBuildPart(BuildId, OverlayBuildPartId); + CbObject OverlayBuildPartManifest = Storage.BuildStorage->GetBuildPart(BuildId, OverlayBuildPartId); ZEN_CONSOLE("GetBuildPart {} ('{}') took {}. Payload size: {}", OverlayBuildPartId, OverlayBuildPartName, @@ -6486,9 +6870,11 @@ namespace { Path, std::move(IsAcceptedFolder), std::move(IsAcceptedFile), - GetMediumWorkerPool(EWorkloadType::Burst), + SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst), UsePlainProgress ? 5000 : 200, - [&](bool, std::ptrdiff_t) { ZEN_DEBUG("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path); }, + [&](bool, std::ptrdiff_t) { + ZEN_CONSOLE_VERBOSE("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path); + }, AbortFlag); if (AbortFlag) { @@ -6557,7 +6943,7 @@ namespace { FilteredBytesHashed.Start(); ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent( ChunkingStats, - GetMediumWorkerPool(EWorkloadType::Burst), + SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst), Path, UpdatedContent, ChunkController, @@ -6609,8 +6995,6 @@ namespace { { LocalContent = DeletePathsFromChunkedContent(LocalContent, DeletedPaths); } - - ZEN_CONSOLE("Using cached local state"); } ZEN_CONSOLE("Read local state in {}", NiceTimeSpanMs(ReadStateTimer.GetElapsedTimeMs())); ScanContent = false; @@ -6636,7 +7020,7 @@ namespace { FilteredBytesHashed.Start(); ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent( ChunkingStats, - GetMediumWorkerPool(EWorkloadType::Burst), + SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst), Path, CurrentLocalFolderContent, ChunkController, @@ -6670,7 +7054,7 @@ namespace { return LocalContent; } - void DownloadFolder(BuildStorage& Storage, + void DownloadFolder(StorageInstance& Storage, const Oid& BuildId, const std::vector<Oid>& BuildPartIds, std::span<const std::string> BuildPartNames, @@ -6694,7 +7078,7 @@ namespace { std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; std::vector<std::pair<Oid, std::string>> AllBuildParts = - ResolveBuildPartNames(Storage, BuildId, BuildPartIds, BuildPartNames, PreferredMultipartChunkSize); + ResolveBuildPartNames(*Storage.BuildStorage, BuildId, BuildPartIds, BuildPartNames, PreferredMultipartChunkSize); std::vector<ChunkedFolderContent> PartContents; @@ -6786,7 +7170,7 @@ namespace { { BuildPartString.Append(fmt::format(" {} ({})", BuildPart.second, BuildPart.first)); } - ZEN_CONSOLE("Downloading build {}, parts:{}", BuildId, BuildPartString.ToView()); + ZEN_CONSOLE("Downloading build {}, parts:{} to '{}'", BuildId, BuildPartString.ToView(), Path); FolderContent LocalFolderState; DiskStatistics DiskStats; @@ -7131,6 +7515,10 @@ BuildsCommand::BuildsCommand() "<jsonmetadata>"); }; + auto AddCacheOptions = [this](cxxopts::Options& Ops) { + Ops.add_option("cache", "", "zen-cache-host", "Host ip and port for zen builds cache", cxxopts::value(m_ZenCacheHost), "<zenhost>"); + }; + auto AddOutputOptions = [this](cxxopts::Options& Ops) { Ops.add_option("output", "", "plain-progress", "Show progress using plain output", cxxopts::value(m_PlainProgress), "<progress>"); Ops.add_option("output", "", "verbose", "Enable verbose console output", cxxopts::value(m_Verbose), "<verbose>"); @@ -7169,6 +7557,7 @@ BuildsCommand::BuildsCommand() AddCloudOptions(m_UploadOptions); AddFileOptions(m_UploadOptions); AddOutputOptions(m_UploadOptions); + AddCacheOptions(m_UploadOptions); m_UploadOptions.add_options()("h,help", "Print help"); m_UploadOptions.add_option("", "l", "local-path", "Root file system folder for build", cxxopts::value(m_Path), "<local-path>"); m_UploadOptions.add_option("", @@ -7232,6 +7621,7 @@ BuildsCommand::BuildsCommand() AddCloudOptions(m_DownloadOptions); AddFileOptions(m_DownloadOptions); AddOutputOptions(m_DownloadOptions); + AddCacheOptions(m_DownloadOptions); m_DownloadOptions.add_options()("h,help", "Print help"); m_DownloadOptions.add_option("", "l", "local-path", "Root file system folder for build", cxxopts::value(m_Path), "<local-path>"); m_DownloadOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>"); @@ -7284,6 +7674,7 @@ BuildsCommand::BuildsCommand() AddCloudOptions(m_TestOptions); AddFileOptions(m_TestOptions); AddOutputOptions(m_TestOptions); + AddCacheOptions(m_TestOptions); m_TestOptions.add_options()("h,help", "Print help"); m_TestOptions.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>"); m_TestOptions.add_option("", @@ -7304,6 +7695,7 @@ BuildsCommand::BuildsCommand() AddCloudOptions(m_FetchBlobOptions); AddFileOptions(m_FetchBlobOptions); AddOutputOptions(m_FetchBlobOptions); + AddCacheOptions(m_FetchBlobOptions); m_FetchBlobOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>"); m_FetchBlobOptions .add_option("", "", "blob-hash", "IoHash in hex form identifying the blob to download", cxxopts::value(m_BlobHash), "<blob-hash>"); @@ -7333,6 +7725,7 @@ BuildsCommand::BuildsCommand() AddCloudOptions(m_MultiTestDownloadOptions); AddFileOptions(m_MultiTestDownloadOptions); AddOutputOptions(m_MultiTestDownloadOptions); + AddCacheOptions(m_MultiTestDownloadOptions); m_MultiTestDownloadOptions .add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>"); m_MultiTestDownloadOptions.add_option("", "", "build-ids", "Build Ids list separated by ','", cxxopts::value(m_BuildIds), "<ids>"); @@ -7385,6 +7778,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) fmt::format("namespace and bucket options are required for url option\n{}", m_Options.help())); } } + else if (m_StoragePath.empty()) + { + throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", m_UploadOptions.help())); + } }; std::unique_ptr<AuthMgr> Auth; @@ -7393,7 +7790,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) auto CreateAuthMgr = [&]() { if (!Auth) { - std::filesystem::path DataRoot = m_SystemRootDir.empty() ? PickDefaultSystemRootDirectory() : StringToPath(m_SystemRootDir); + std::filesystem::path DataRoot = m_SystemRootDir.empty() + ? PickDefaultSystemRootDirectory() + : std::filesystem::absolute(StringToPath(m_SystemRootDir)).make_preferred(); if (m_EncryptionKey.empty()) { @@ -7448,7 +7847,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } else if (!m_AccessTokenPath.empty()) { - std::string ResolvedAccessToken = ReadAccessTokenFromFile(StringToPath(m_AccessTokenPath)); + std::string ResolvedAccessToken = + ReadAccessTokenFromFile(std::filesystem::absolute(StringToPath(m_AccessTokenPath)).make_preferred()); if (!ResolvedAccessToken.empty()) { ClientSettings.AccessTokenProvider = httpclientauth::CreateFromStaticToken(ResolvedAccessToken); @@ -7486,15 +7886,63 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) }; ParseOutputOptions(); + auto CreateBuildStorage = [&](BuildStorage::Statistics& StorageStats, + BuildStorageCache::Statistics& StorageCacheStats, + const std::filesystem::path& TempPath) -> StorageInstance { + ParseStorageOptions(); + + StorageInstance Result; + + if (!m_BuildsUrl.empty()) + { + ParseAuthOptions(); + Result.BuildStorageHttp = std::make_unique<HttpClient>(m_BuildsUrl, ClientSettings); + ZEN_CONSOLE("Using cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}'", + m_BuildsUrl, + Result.BuildStorageHttp->GetSessionId(), + m_Namespace, + m_Bucket); + Result.BuildStorage = + CreateJupiterBuildStorage(Log(), *Result.BuildStorageHttp, StorageStats, m_Namespace, m_Bucket, TempPath / "storage"); + Result.StorageName = ZEN_CLOUD_STORAGE; + } + else if (!m_StoragePath.empty()) + { + std::filesystem::path StoragePath = std::filesystem::absolute(StringToPath(m_StoragePath)).make_preferred(); + ZEN_CONSOLE("Using folder {}", StoragePath); + Result.BuildStorage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); + Result.StorageName = fmt::format("Disk {}", StoragePath.stem()); + } + else + { + throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); + } + if (!m_ZenCacheHost.empty()) + { + Result.CacheHttp = std::make_unique<HttpClient>(m_ZenCacheHost, + HttpClientSettings{.LogCategory = "httpcacheclient", + .ConnectTimeout = std::chrono::milliseconds{3000}, + .Timeout = std::chrono::milliseconds{30000}, + .AssumeHttp2 = false, + .AllowResume = true, + .RetryCount = 0}); + if (Result.CacheHttp->Get("/health").IsSuccess()) + { + Result.BuildCacheStorage = + CreateZenBuildStorageCache(*Result.CacheHttp, StorageCacheStats, m_Namespace, m_Bucket, TempPath / "zencache"); + } + else + { + Result.CacheHttp.reset(); + } + } + return Result; + }; + try { if (SubOption == &m_ListOptions) { - ParseStorageOptions(); - ParseAuthOptions(); - - HttpClient Http(m_BuildsUrl, ClientSettings); - CbObject QueryObject; if (m_ListQueryPath.empty()) { @@ -7505,7 +7953,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } else { - std::filesystem::path ListQueryPath = StringToPath(m_ListQueryPath); + std::filesystem::path ListQueryPath = std::filesystem::absolute(StringToPath(m_ListQueryPath)).make_preferred(); if (ToLower(ListQueryPath.extension().string()) == ".cbo") { QueryObject = LoadCompactBinaryObject(IoBufferBuilder::MakeFromFile(ListQueryPath)); @@ -7525,28 +7973,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } BuildStorage::Statistics StorageStats; - std::unique_ptr<BuildStorage> Storage; - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE_VERBOSE("Querying builds in cloud endpoint '{}'. SessionId: '{}'. Namespace '{}', Bucket '{}'", - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, std::filesystem::path{}); - } - else if (!m_StoragePath.empty()) - { - std::filesystem::path StoragePath = StringToPath(m_StoragePath); - ZEN_CONSOLE_VERBOSE("Querying builds in folder '{}'.", StoragePath); - Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } + BuildStorageCache::Statistics StorageCacheStats; - CbObject Response = Storage->ListBuilds(QueryObject); + StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderName); + + CbObject Response = Storage.BuildStorage->ListBuilds(QueryObject); ZEN_ASSERT(ValidateCompactBinary(Response.GetView(), CbValidateMode::All) == CbValidateError::None); if (m_ListResultPath.empty()) { @@ -7556,7 +7987,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } else { - std::filesystem::path ListResultPath = StringToPath(m_ListResultPath); + std::filesystem::path ListResultPath = std::filesystem::absolute(StringToPath(m_ListResultPath)).make_preferred(); if (ToLower(ListResultPath.extension().string()) == ".cbo") { MemoryView ResponseView = Response.GetView(); @@ -7575,11 +8006,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_UploadOptions) { - ParseStorageOptions(); - ParseAuthOptions(); - - HttpClient Http(m_BuildsUrl, ClientSettings); - if (m_Path.empty()) { throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_UploadOptions.help())); @@ -7610,7 +8036,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } } - std::filesystem::path Path = StringToPath(m_Path); + std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)); if (m_BuildPartName.empty()) { @@ -7645,48 +8071,20 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw zen::OptionParseException(fmt::format("Invalid build part id\n{}", m_UploadOptions.help())); } + const Oid BuildId = Oid::FromHexString(m_BuildId); + const Oid BuildPartId = Oid::FromHexString(m_BuildPartId); + BuildStorage::Statistics StorageStats; - const Oid BuildId = Oid::FromHexString(m_BuildId); - const Oid BuildPartId = Oid::FromHexString(m_BuildPartId); - std::unique_ptr<BuildStorage> Storage; - std::string StorageName; - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE("Uploading '{}' from '{}' to cloud endpoint '{}'. SessionId: '{}'. Namespace '{}', Bucket '{}', {}BuildId '{}'", - m_BuildPartName, - Path, - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket, - GeneratedBuildId ? "Generated " : "", - BuildId); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName); - StorageName = "Cloud DDC"; - } - else if (!m_StoragePath.empty()) - { - std::filesystem::path StoragePath = StringToPath(m_StoragePath); - ZEN_CONSOLE("Uploading '{}' from '{}' to folder '{}'. {}BuildId '{}'", - m_BuildPartName, - Path, - StoragePath, - GeneratedBuildId ? "Generated " : "", - BuildId); - Storage = CreateFileBuildStorage(StoragePath, StorageStats, m_WriteMetadataAsJson, DefaultLatency, DefaultDelayPerKBSec); - StorageName = fmt::format("Disk {}", StoragePath.stem()); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } + BuildStorageCache::Statistics StorageCacheStats; + + StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, Path / ZenTempFolderName); CbObject MetaData; if (m_CreateBuild) { if (!m_BuildMetadataPath.empty()) { - std::filesystem::path MetadataPath = StringToPath(m_BuildMetadataPath); + std::filesystem::path MetadataPath = std::filesystem::absolute(StringToPath(m_BuildMetadataPath)); IoBuffer MetaDataJson = ReadFile(MetadataPath).Flatten(); std::string_view Json(reinterpret_cast<const char*>(MetaDataJson.GetData()), MetaDataJson.GetSize()); std::string JsonError; @@ -7713,12 +8111,12 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } } - UploadFolder(*Storage, + UploadFolder(Storage, BuildId, BuildPartId, m_BuildPartName, Path, - StringToPath(m_ManifestPath), + std::filesystem::absolute(StringToPath(m_ManifestPath)).make_preferred(), m_BlockReuseMinPercentLimit, m_AllowMultiparts, MetaData, @@ -7735,7 +8133,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) "Requests: {}\n" "Avg Request Time: {}\n" "Avg I/O Time: {}", - StorageName, + Storage.StorageName, NiceBytes(StorageStats.TotalBytesRead.load()), NiceBytes(StorageStats.TotalBytesWritten.load()), StorageStats.TotalRequestCount.load(), @@ -7751,11 +8149,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_DownloadOptions) { - ParseStorageOptions(); - ParseAuthOptions(); - - HttpClient Http(m_BuildsUrl, ClientSettings); - if (m_Path.empty()) { throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help())); @@ -7786,37 +8179,14 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } } - std::filesystem::path Path = StringToPath(m_Path); + std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)).make_preferred(); BuildStorage::Statistics StorageStats; - std::unique_ptr<BuildStorage> Storage; - std::string StorageName; - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE("Downloading '{}' to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", - BuildId, - Path, - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket, - BuildId); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName); - StorageName = "Cloud DDC"; - } - else if (!m_StoragePath.empty()) - { - std::filesystem::path StoragePath = StringToPath(m_StoragePath); - ZEN_CONSOLE("Downloading '{}' to '{}' from folder {}. BuildId '{}'", BuildId, Path, StoragePath, BuildId); - Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); - StorageName = fmt::format("Disk {}", StoragePath.stem()); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } + BuildStorageCache::Statistics StorageCacheStats; + + StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, Path / ZenTempFolderName); - DownloadFolder(*Storage, + DownloadFolder(Storage, BuildId, BuildPartIds, m_BuildPartNames, @@ -7835,7 +8205,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) "Requests: {}\n" "Avg Request Time: {}\n" "Avg I/O Time: {}", - StorageName, + Storage.StorageName, NiceBytes(StorageStats.TotalBytesRead.load()), NiceBytes(StorageStats.TotalBytesWritten.load()), StorageStats.TotalRequestCount.load(), @@ -7859,8 +8229,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { throw zen::OptionParseException(fmt::format("compare-path is required\n{}", m_DownloadOptions.help())); } - std::filesystem::path Path = StringToPath(m_Path); - std::filesystem::path DiffPath = StringToPath(m_DiffPath); + std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)).make_preferred(); + std::filesystem::path DiffPath = std::filesystem::absolute(StringToPath(m_DiffPath)).make_preferred(); DiffFolders(Path, DiffPath, m_OnlyChunked); return AbortFlag ? 11 : 0; } @@ -7872,10 +8242,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help())); } - ParseStorageOptions(); - ParseAuthOptions(); - - HttpClient Http(m_BuildsUrl, ClientSettings); // m_StoragePath = "D:\\buildstorage"; // m_Path = "F:\\Saved\\DownloadedBuilds\\++Fortnite+Main-CL-XXXXXXXX\\WindowsClient"; // std::vector<std::string> BuildIdStrings{"07d3942f0e7f4ca1b13b0587", @@ -7886,34 +8252,12 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) // "07d3964f919d577a321a1fdd", // "07d396a6ce875004e16b9528"}; - std::filesystem::path Path = StringToPath(m_Path); + std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)).make_preferred(); BuildStorage::Statistics StorageStats; - std::unique_ptr<BuildStorage> Storage; - std::string StorageName; - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE("Downloading {} to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}'", - FormatArray<std::string>(m_BuildIds, " "sv), - Path, - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName); - StorageName = "Cloud DDC"; - } - else if (!m_StoragePath.empty()) - { - std::filesystem::path StoragePath = StringToPath(m_StoragePath); - ZEN_CONSOLE("Downloading {}'to '{}' from folder {}", FormatArray<std::string>(m_BuildIds, " "sv), Path, StoragePath); - Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); - StorageName = fmt::format("Disk {}", StoragePath.stem()); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } + BuildStorageCache::Statistics StorageCacheStats; + + StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, Path / ZenTempFolderName); Stopwatch Timer; for (const std::string& BuildIdString : m_BuildIds) @@ -7923,7 +8267,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { throw zen::OptionParseException(fmt::format("invalid build id {}\n{}", BuildIdString, m_DownloadOptions.help())); } - DownloadFolder(*Storage, + DownloadFolder(Storage, BuildId, {}, {}, @@ -7945,36 +8289,29 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_TestOptions) { - ParseStorageOptions(); - ParseAuthOptions(); - - HttpClient Http(m_BuildsUrl, ClientSettings); - if (m_Path.empty()) { throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help())); } - std::filesystem::path Path = StringToPath(m_Path); + std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)).make_preferred(); m_BuildId = Oid::NewOid().ToString(); m_BuildPartName = Path.filename().string(); m_BuildPartId = Oid::NewOid().ToString(); m_CreateBuild = true; - BuildStorage::Statistics StorageStats; - const Oid BuildId = Oid::FromHexString(m_BuildId); - const Oid BuildPartId = Oid::FromHexString(m_BuildPartId); - std::unique_ptr<BuildStorage> Storage; - std::string StorageName; + const Oid BuildId = Oid::FromHexString(m_BuildId); + const Oid BuildPartId = Oid::FromHexString(m_BuildPartId); - std::filesystem::path StoragePath = StringToPath(m_StoragePath); + std::filesystem::path StoragePath = std::filesystem::absolute(StringToPath(m_StoragePath)).make_preferred(); if (m_BuildsUrl.empty() && StoragePath.empty()) { StoragePath = (GetRunningExecutablePath().parent_path() / ".tmpstore").make_preferred(); CreateDirectories(StoragePath); CleanDirectory(StoragePath, {}); + m_StoragePath = StoragePath.generic_string(); } auto _ = MakeGuard([&]() { if (m_BuildsUrl.empty() && StoragePath.empty()) @@ -7983,33 +8320,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } }); - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE("Using '{}' to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", - m_BuildPartName.empty() ? m_BuildPartId : m_BuildPartName, - Path, - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket, - BuildId); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName); - StorageName = "Cloud DDC"; - } - else if (!StoragePath.empty()) - { - ZEN_CONSOLE("Using '{}' to '{}' from folder {}. BuildId '{}'", - m_BuildPartName.empty() ? m_BuildPartId : m_BuildPartName, - Path, - StoragePath, - BuildId); - Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); - StorageName = fmt::format("Disk {}", StoragePath.stem()); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } + BuildStorage::Statistics StorageStats; + BuildStorageCache::Statistics StorageCacheStats; + + StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, Path / ZenTempFolderName); auto MakeMetaData = [](const Oid& BuildId) -> CbObject { CbObjectWriter BuildMetaDataWriter; @@ -8032,7 +8346,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) ZEN_CONSOLE("Upload Build {}, Part {} ({})\n{}", m_BuildId, BuildPartId, m_BuildPartName, SB.ToView()); } - UploadFolder(*Storage, + UploadFolder(Storage, BuildId, BuildPartId, m_BuildPartName, @@ -8052,7 +8366,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) const std::filesystem::path DownloadPath = Path.parent_path() / (m_BuildPartName + "_download"); ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, true, true); + DownloadFolder(Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, true, true); if (AbortFlag) { ZEN_CONSOLE("Download failed."); @@ -8064,7 +8378,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); + DownloadFolder(Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed. (identical target)"); @@ -8115,7 +8429,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SourceSize > 256) { Work.ScheduleWork( - GetMediumWorkerPool(EWorkloadType::Burst), + SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst), [SourceSize, FilePath](std::atomic<bool>&) { if (!AbortFlag) { @@ -8168,7 +8482,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); + DownloadFolder(Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed. (scrambled target)"); @@ -8187,7 +8501,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) ZEN_CONSOLE("\nUpload scrambled Build {}, Part {} ({})\n{}\n", BuildId2, BuildPartId2, m_BuildPartName, SB.ToView()); } - UploadFolder(*Storage, + UploadFolder(Storage, BuildId2, BuildPartId2, m_BuildPartName, @@ -8206,7 +8520,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); + DownloadFolder(Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed."); @@ -8214,7 +8528,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, + DownloadFolder(Storage, BuildId2, {BuildPartId2}, {}, @@ -8230,7 +8544,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, + DownloadFolder(Storage, BuildId2, {BuildPartId2}, {}, @@ -8250,11 +8564,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_FetchBlobOptions) { - ParseStorageOptions(); - ParseAuthOptions(); - - HttpClient Http(m_BuildsUrl, ClientSettings); - if (m_BlobHash.empty()) { throw zen::OptionParseException(fmt::format("Blob hash string is missing\n{}", m_UploadOptions.help())); @@ -8266,44 +8575,18 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw zen::OptionParseException(fmt::format("Blob hash string is invalid\n{}", m_UploadOptions.help())); } - if (m_BuildsUrl.empty() && m_StoragePath.empty()) - { - throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", m_UploadOptions.help())); - } + const Oid BuildId = Oid::FromHexString(m_BuildId); - BuildStorage::Statistics StorageStats; - const Oid BuildId = Oid::FromHexString(m_BuildId); - std::unique_ptr<BuildStorage> Storage; - std::string StorageName; + std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)).make_preferred(); - std::filesystem::path Path = StringToPath(m_Path); + BuildStorage::Statistics StorageStats; + BuildStorageCache::Statistics StorageCacheStats; - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE("Using from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket, - BuildId); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName); - StorageName = "Cloud DDC"; - } - else if (!m_StoragePath.empty()) - { - std::filesystem::path StoragePath = StringToPath(m_StoragePath); - ZEN_CONSOLE("Using folder {}. BuildId '{}'", StoragePath, BuildId); - Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); - StorageName = fmt::format("Disk {}", StoragePath.stem()); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } + StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, Path / ZenTempFolderName); uint64_t CompressedSize; uint64_t DecompressedSize; - ValidateBlob(*Storage, BuildId, BlobHash, CompressedSize, DecompressedSize); + ValidateBlob(*Storage.BuildStorage, BuildId, BlobHash, CompressedSize, DecompressedSize); if (AbortFlag) { return 11; @@ -8317,10 +8600,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_ValidateBuildPartOptions) { - ParseStorageOptions(); - ParseAuthOptions(); - - HttpClient Http(m_BuildsUrl, ClientSettings); + // HttpClient Http(m_BuildsUrl, ClientSettings); if (m_BuildsUrl.empty() && m_StoragePath.empty()) { @@ -8342,39 +8622,18 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw zen::OptionParseException(fmt::format("build-part-id conflicts with build-part-name\n{}", m_DownloadOptions.help())); } + std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)).make_preferred(); + BuildStorage::Statistics StorageStats; - std::unique_ptr<BuildStorage> Storage; - std::string StorageName; + BuildStorageCache::Statistics StorageCacheStats; - std::filesystem::path Path = StringToPath(m_Path); + StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, Path / ZenTempFolderName); - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE("Using from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket, - BuildId); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName); - StorageName = "Cloud DDC"; - } - else if (!m_StoragePath.empty()) - { - std::filesystem::path StoragePath = StringToPath(m_StoragePath); - ZEN_CONSOLE("Using folder {}. BuildId '{}'", StoragePath, BuildId); - Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); - StorageName = fmt::format("Disk {}", StoragePath.stem()); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } Oid BuildPartId = Oid::TryFromHexString(m_BuildPartId); ValidateStatistics ValidateStats; DownloadStatistics DownloadStats; - ValidateBuildPart(*Storage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, DownloadStats); + ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, DownloadStats); return AbortFlag ? 13 : 0; } |