diff options
| author | Zousar Shaker <[email protected]> | 2025-03-26 17:01:30 -0600 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-03-26 17:01:30 -0600 |
| commit | 56af48235a5394b2906e9b347d43404394a4e756 (patch) | |
| tree | 05530b3da98773794320e5a2ab507ec879bf6e9d /src | |
| parent | Descriptive type conversion messages (diff) | |
| parent | zen build cache service (#318) (diff) | |
| download | zen-56af48235a5394b2906e9b347d43404394a4e756.tar.xz zen-56af48235a5394b2906e9b347d43404394a4e756.zip | |
Merge branch 'main' into zs/ui-show-cook-artifacts
Diffstat (limited to 'src')
33 files changed, 3984 insertions, 947 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 889ccef0b..b2ad579f1 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -20,6 +20,7 @@ #include <zenhttp/httpclient.h> #include <zenhttp/httpclientauth.h> #include <zenhttp/httpcommon.h> +#include <zenutil/buildstoragecache.h> #include <zenutil/chunkblock.h> #include <zenutil/chunkedcontent.h> #include <zenutil/chunkedfile.h> @@ -51,6 +52,8 @@ ZEN_THIRD_PARTY_INCLUDES_END #define EXTRA_VERIFY 0 +#define ZEN_CLOUD_STORAGE "Cloud Storage" + namespace zen { namespace { static std::atomic<bool> AbortFlag = false; @@ -87,6 +90,8 @@ namespace { const double DefaultLatency = 0; // .0010; const double DefaultDelayPerKBSec = 0; // 0.00005; + const bool SingleThreaded = false; + const std::string ZenFolderName = ".zen"; const std::string ZenStateFilePath = fmt::format("{}/current_state.cbo", ZenFolderName); const std::string ZenStateFileJsonPath = fmt::format("{}/current_state.json", ZenFolderName); @@ -204,22 +209,27 @@ namespace { { try { - std::filesystem::remove(LocalFilePath); - } - catch (const std::exception&) - { - // DeleteOnClose files may be a bit slow in getting cleaned up, so pause amd retry one time - Sleep(200); - try - { - std::filesystem::remove(LocalFilePath); - } - catch (const std::exception& Ex) + std::error_code Ec; + std::filesystem::remove(LocalFilePath, Ec); + if (Ec) { - ZEN_WARN("Failed removing file {}. Reason: {}", LocalFilePath, Ex.what()); - CleanWipe = false; + // DeleteOnClose files may be a bit slow in getting cleaned up, so pause amd retry one time + Ec.clear(); + if (std::filesystem::exists(LocalFilePath, Ec) || Ec) + { + Sleep(200); + if (std::filesystem::exists(LocalFilePath)) + { + std::filesystem::remove(LocalFilePath); + } + } } } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed removing file {}. Reason: {}", LocalFilePath, Ex.what()); + CleanWipe = false; + } } for (const std::filesystem::path& LocalDirPath : LocalDirectoryContent.Directories) @@ -425,7 +435,7 @@ namespace { Path, std::move(IsAcceptedFolder), std::move(IsAcceptedFile), - GetMediumWorkerPool(EWorkloadType::Burst), + SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst), UsePlainProgress ? 5000 : 200, [](bool, std::ptrdiff_t) {}, AbortFlag); @@ -439,7 +449,7 @@ namespace { FilteredBytesHashed.Start(); ChunkedFolderContent FolderContent = ChunkFolderContent( ChunkingStats, - GetMediumWorkerPool(EWorkloadType::Burst), + SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst), Path, Content, ChunkController, @@ -603,11 +613,9 @@ namespace { struct WriteChunkStatistics { - std::atomic<uint32_t> ChunkCountWritten = 0; - std::atomic<uint64_t> ChunkBytesWritten = 0; - uint64_t DownloadTimeUs = 0; - uint64_t WriteTimeUs = 0; - uint64_t WriteChunksElapsedWallTimeUs = 0; + uint64_t DownloadTimeUs = 0; + uint64_t WriteTimeUs = 0; + uint64_t WriteChunksElapsedWallTimeUs = 0; }; struct RebuildFolderStateStatistics @@ -626,6 +634,15 @@ namespace { uint64_t VerifyElapsedWallTimeUs = 0; }; + struct StorageInstance + { + std::unique_ptr<HttpClient> BuildStorageHttp; + std::unique_ptr<BuildStorage> BuildStorage; + std::string StorageName; + std::unique_ptr<HttpClient> CacheHttp; + std::unique_ptr<BuildStorageCache> BuildCacheStorage; + }; + std::vector<uint32_t> CalculateAbsoluteChunkOrders(const std::span<const IoHash> LocalChunkHashes, const std::span<const uint32_t> LocalChunkOrder, const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& ChunkHashToLocalChunkIndex, @@ -772,7 +789,7 @@ namespace { std::span<const uint32_t> ChunkCounts, std::span<const IoHash> LocalChunkHashes, std::span<const uint64_t> LocalChunkRawSizes, - std::vector<uint32_t> AbsoluteChunkOrders, + const std::vector<uint32_t>& AbsoluteChunkOrders, const std::span<const uint32_t> LooseLocalChunkIndexes, const std::span<IoHash> BlockHashes) { @@ -1444,7 +1461,7 @@ namespace { for (auto& WorkItem : WorkItems) { Work.ScheduleWork( - NetworkPool, // GetSyncWorkerPool(),// + NetworkPool, [WorkItem = std::move(WorkItem)](std::atomic<bool>&) { ZEN_TRACE_CPU("DownloadLargeBlob_Work"); if (!AbortFlag) @@ -1513,15 +1530,16 @@ namespace { } ValidateStats.BlockAttachmentCount = BlockAttachments.size(); - std::vector<ChunkBlockDescription> VerifyBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockAttachments); + std::vector<ChunkBlockDescription> VerifyBlockDescriptions = + ParseChunkBlockDescriptionList(Storage.GetBlockMetadatas(BuildId, BlockAttachments)); if (VerifyBlockDescriptions.size() != BlockAttachments.size()) { throw std::runtime_error(fmt::format("Uploaded blocks metadata could not all be found, {} blocks metadata is missing", BlockAttachments.size() - VerifyBlockDescriptions.size())); } - WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // - WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // + WorkerThreadPool& NetworkPool = SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst); + WorkerThreadPool& VerifyPool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst); ParallellWork Work(AbortFlag); const std::filesystem::path TempFolder = ".zen-tmp"; @@ -1881,7 +1899,7 @@ namespace { void GenerateBuildBlocks(const std::filesystem::path& Path, const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, - BuildStorage& Storage, + StorageInstance& Storage, const Oid& BuildId, const std::vector<std::vector<uint32_t>>& NewBlockChunks, GeneratedBlocks& OutBlocks, @@ -1904,9 +1922,8 @@ namespace { RwLock Lock; - WorkerThreadPool& GenerateBlobsPool = - GetMediumWorkerPool(EWorkloadType::Burst); // GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();// - WorkerThreadPool& UploadBlocksPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();// + WorkerThreadPool& GenerateBlobsPool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst); + WorkerThreadPool& UploadBlocksPool = SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst); FilteredRate FilteredGeneratedBytesPerSecond; FilteredRate FilteredUploadedBytesPerSecond; @@ -2005,21 +2022,35 @@ namespace { const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash; const uint64_t CompressedBlockSize = Payload.GetCompressedSize(); - Storage.PutBuildBlob(BuildId, - BlockHash, - ZenContentType::kCompressedBinary, - std::move(Payload).GetCompressed()); + if (Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBuildBlob(BuildId, + BlockHash, + ZenContentType::kCompressedBinary, + Payload.GetCompressed()); + } + + Storage.BuildStorage->PutBuildBlob(BuildId, + BlockHash, + ZenContentType::kCompressedBinary, + std::move(Payload).GetCompressed()); UploadStats.BlocksBytes += CompressedBlockSize; + ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks", - OutBlocks.BlockDescriptions[BlockIndex].BlockHash, + BlockHash, NiceBytes(CompressedBlockSize), OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); - Storage.PutBlockMetadata(BuildId, - OutBlocks.BlockDescriptions[BlockIndex].BlockHash, - BlockMetaData); + if (Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBlobMetadatas(BuildId, + std::vector<IoHash>({BlockHash}), + std::vector<CbObject>({BlockMetaData})); + } + + Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData); ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", - OutBlocks.BlockDescriptions[BlockIndex].BlockHash, + BlockHash, NiceBytes(BlockMetaData.GetSize())); OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; @@ -2074,7 +2105,7 @@ namespace { } } - void UploadPartBlobs(BuildStorage& Storage, + void UploadPartBlobs(StorageInstance& Storage, const Oid& BuildId, const std::filesystem::path& Path, const ChunkedFolderContent& Content, @@ -2092,8 +2123,8 @@ namespace { { ProgressBar ProgressBar(UsePlainProgress); - WorkerThreadPool& ReadChunkPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // - WorkerThreadPool& UploadChunkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // + WorkerThreadPool& ReadChunkPool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst); + WorkerThreadPool& UploadChunkPool = SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst); FilteredRate FilteredGenerateBlockBytesPerSecond; FilteredRate FilteredCompressedBytesPerSecond; @@ -2177,18 +2208,26 @@ namespace { const CbObject BlockMetaData = BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]); - Storage.PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); + if (Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); + } + Storage.BuildStorage->PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks", - NewBlocks.BlockDescriptions[BlockIndex].BlockHash, + BlockHash, NiceBytes(PayloadSize), NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); UploadedBlockSize += PayloadSize; UploadStats.BlocksBytes += PayloadSize; - Storage.PutBlockMetadata(BuildId, BlockHash, BlockMetaData); - ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", - NewBlocks.BlockDescriptions[BlockIndex].BlockHash, - NiceBytes(BlockMetaData.GetSize())); + if (Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBlobMetadatas(BuildId, + std::vector<IoHash>({BlockHash}), + std::vector<CbObject>({BlockMetaData})); + } + Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData); + ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", BlockHash, NiceBytes(BlockMetaData.GetSize())); NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; @@ -2214,12 +2253,17 @@ namespace { ZEN_TRACE_CPU("AsyncUploadLooseChunk"); const uint64_t PayloadSize = Payload.GetSize(); - ; + + if (Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); + } + if (PayloadSize >= LargeAttachmentSize) { ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart"); UploadStats.MultipartAttachmentCount++; - std::vector<std::function<void()>> MultipartWork = Storage.PutLargeBuildBlob( + std::vector<std::function<void()>> MultipartWork = Storage.BuildStorage->PutLargeBuildBlob( BuildId, RawHash, ZenContentType::kCompressedBinary, @@ -2232,7 +2276,7 @@ namespace { PartPayload.SetContentType(ZenContentType::kBinary); return PartPayload; }, - [&, RawSize](uint64_t SentBytes, bool IsComplete) { + [&, Payload, RawSize](uint64_t SentBytes, bool IsComplete) { UploadStats.ChunksBytes += SentBytes; UploadedCompressedChunkSize += SentBytes; if (IsComplete) @@ -2264,7 +2308,7 @@ namespace { else { ZEN_TRACE_CPU("AsyncUploadLooseChunk_Singlepart"); - Storage.PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); + Storage.BuildStorage->PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); ZEN_CONSOLE_VERBOSE("Uploaded chunk {} ({})", RawHash, NiceBytes(PayloadSize)); UploadStats.ChunksBytes += Payload.GetSize(); UploadStats.ChunkCount++; @@ -2303,7 +2347,7 @@ namespace { if (!AbortFlag) { Work.ScheduleWork( - ReadChunkPool, // GetSyncWorkerPool() + SingleThreaded ? GetSyncWorkerPool() : ReadChunkPool, [&, BlockIndex](std::atomic<bool>&) { if (!AbortFlag) { @@ -2362,7 +2406,7 @@ namespace { { const uint32_t ChunkIndex = LooseChunkIndexes[CompressLooseChunkOrderIndex]; Work.ScheduleWork( - ReadChunkPool, // GetSyncWorkerPool(),// ReadChunkPool, + SingleThreaded ? GetSyncWorkerPool() : ReadChunkPool, [&, ChunkIndex](std::atomic<bool>&) { if (!AbortFlag) { @@ -2598,7 +2642,7 @@ namespace { return FilteredReuseBlockIndexes; }; - void UploadFolder(BuildStorage& Storage, + void UploadFolder(StorageInstance& Storage, const Oid& BuildId, const Oid& BuildPartId, const std::string_view BuildPartName, @@ -2654,7 +2698,7 @@ namespace { ZEN_TRACE_CPU("CreateBuild"); Stopwatch PutBuildTimer; - CbObject PutBuildResult = Storage.PutBuild(BuildId, MetaData); + CbObject PutBuildResult = Storage.BuildStorage->PutBuild(BuildId, MetaData); Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs(); Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize); Result.PayloadSize = MetaData.GetSize(); @@ -2663,7 +2707,7 @@ namespace { { ZEN_TRACE_CPU("PutBuild"); Stopwatch GetBuildTimer; - CbObject Build = Storage.GetBuild(BuildId); + CbObject Build = Storage.BuildStorage->GetBuild(BuildId); Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs(); Result.PayloadSize = Build.GetSize(); if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) @@ -2681,7 +2725,7 @@ namespace { { ZEN_TRACE_CPU("FindBlocks"); Stopwatch KnownBlocksTimer; - Result.KnownBlocks = Storage.FindBlocks(BuildId); + Result.KnownBlocks = ParseChunkBlockDescriptionList(Storage.BuildStorage->FindBlocks(BuildId)); FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs(); FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size(); Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs(); @@ -2796,10 +2840,10 @@ namespace { } return true; }, - GetMediumWorkerPool(EWorkloadType::Burst), + SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst), UsePlainProgress ? 5000 : 200, [&](bool, std::ptrdiff_t) { - ZEN_DEBUG("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path); + ZEN_CONSOLE_VERBOSE("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path); }, AbortFlag); } @@ -2855,7 +2899,7 @@ namespace { FilteredBytesHashed.Start(); LocalContent = ChunkFolderContent( ChunkingStats, - GetMediumWorkerPool(EWorkloadType::Burst), + SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst), Path, Content, *ChunkController, @@ -2985,7 +3029,7 @@ namespace { (FindBlocksStats.AcceptedByteCount + FindBlocksStats.AcceptedReduntantByteCount) : 0.0; ZEN_CONSOLE( - "Found {} chunks in {} ({}) blocks eligeble for reuse in {}\n" + "Found {} chunks in {} ({}) blocks eligible for reuse in {}\n" " Reusing {} ({}) matching chunks in {} blocks ({:.1f}%)\n" " Accepting {} ({}) redundant chunks ({:.1f}%)\n" " Rejected {} ({}) chunks in {} blocks\n" @@ -3204,7 +3248,8 @@ namespace { } Stopwatch PutBuildPartResultTimer; - std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult = Storage.PutBuildPart(BuildId, BuildPartId, BuildPartName, PartManifest); + std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult = + Storage.BuildStorage->PutBuildPart(BuildId, BuildPartId, BuildPartName, PartManifest); ZEN_CONSOLE("PutBuildPart took {}, payload size {}. {} attachments are needed.", NiceTimeSpanMs(PutBuildPartResultTimer.GetElapsedTimeMs()), NiceBytes(PartManifest.GetSize()), @@ -3289,7 +3334,7 @@ namespace { while (!AbortFlag) { Stopwatch FinalizeBuildPartTimer; - std::vector<IoHash> Needs = Storage.FinalizeBuildPart(BuildId, BuildPartId, PartHash); + std::vector<IoHash> Needs = Storage.BuildStorage->FinalizeBuildPart(BuildId, BuildPartId, PartHash); ZEN_CONSOLE("FinalizeBuildPart took {}. {} attachments are missing.", NiceTimeSpanMs(FinalizeBuildPartTimer.GetElapsedTimeMs()), Needs.size()); @@ -3304,7 +3349,7 @@ namespace { if (CreateBuild && !AbortFlag) { Stopwatch FinalizeBuildTimer; - Storage.FinalizeBuild(BuildId); + Storage.BuildStorage->FinalizeBuild(BuildId); ZEN_CONSOLE("FinalizeBuild took {}", NiceTimeSpanMs(FinalizeBuildTimer.GetElapsedTimeMs())); } @@ -3321,7 +3366,13 @@ namespace { { const CbObject BlockMetaData = BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]); - Storage.PutBlockMetadata(BuildId, BlockHash, BlockMetaData); + if (Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBlobMetadatas(BuildId, + std::vector<IoHash>({BlockHash}), + std::vector<CbObject>({BlockMetaData})); + } + Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData); UploadStats.BlocksBytes += BlockMetaData.GetSize(); NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; UploadBlockMetadataCount++; @@ -3340,7 +3391,7 @@ namespace { DownloadStatistics ValidateDownloadStats; if (PostUploadVerify && !AbortFlag) { - ValidateBuildPart(Storage, BuildId, BuildPartId, BuildPartName, ValidateStats, ValidateDownloadStats); + ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, BuildPartName, ValidateStats, ValidateDownloadStats); } ZEN_CONSOLE_VERBOSE( @@ -3570,7 +3621,7 @@ namespace { ValidateInfo); - Storage.PutBuildPartStats( + Storage.BuildStorage->PutBuildPartStats( BuildId, BuildPartId, {{"totalSize", double(LocalFolderScanStats.FoundFileByteCount.load())}, @@ -3597,7 +3648,7 @@ namespace { ProgressBar ProgressBar(UsePlainProgress); - WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // + WorkerThreadPool& VerifyPool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst); ParallellWork Work(AbortFlag); @@ -3873,6 +3924,22 @@ namespace { return ChunkTargetPtrs; }; + uint64_t GetChunkWriteCount(std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + const ChunkedContentLookup& Lookup, + uint32_t ChunkIndex) + { + uint64_t WriteCount = 0; + std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkSources = GetChunkSequenceLocations(Lookup, ChunkIndex); + for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources) + { + if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0) + { + WriteCount++; + } + } + return WriteCount; + }; + void FinalizeChunkSequence(const std::filesystem::path& TargetFolder, const IoHash& SequenceRawHash) { ZEN_TRACE_CPU("FinalizeChunkSequence"); @@ -3892,8 +3959,39 @@ namespace { } } + void VerifySequence(const std::filesystem::path& TargetFolder, + const ChunkedFolderContent& RemoteContent, + const ChunkedContentLookup& Lookup, + uint32_t RemoteSequenceIndex) + { + ZEN_TRACE_CPU("VerifySequence"); + const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + { + ZEN_TRACE_CPU("HashSequence"); + const std::uint32_t RemotePathIndex = Lookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; + const uint64_t ExpectedSize = RemoteContent.RawSizes[RemotePathIndex]; + IoBuffer VerifyBuffer = IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash)); + const uint64_t VerifySize = VerifyBuffer.GetSize(); + if (VerifySize != ExpectedSize) + { + throw std::runtime_error(fmt::format("Written chunk sequence {} size {} does not match expected size {}", + SequenceRawHash, + VerifySize, + ExpectedSize)); + } + ZEN_TRACE_CPU("HashSequence"); + const IoHash VerifyChunkHash = IoHash::HashBuffer(std::move(VerifyBuffer)); + if (VerifyChunkHash != SequenceRawHash) + { + throw std::runtime_error( + fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash)); + } + } + } + void VerifyAndCompleteChunkSequencesAsync(const std::filesystem::path& TargetFolder, const ChunkedFolderContent& RemoteContent, + const ChunkedContentLookup& Lookup, std::span<const uint32_t> RemoteSequenceIndexes, ParallellWork& Work, WorkerThreadPool& VerifyPool) @@ -3908,40 +4006,24 @@ namespace { const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset]; Work.ScheduleWork( VerifyPool, - [&RemoteContent, TargetFolder, RemoteSequenceIndex](std::atomic<bool>&) { + [&RemoteContent, &Lookup, TargetFolder, RemoteSequenceIndex](std::atomic<bool>&) { if (!AbortFlag) { ZEN_TRACE_CPU("VerifyAndCompleteChunkSequenceAsync"); - const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + VerifySequence(TargetFolder, RemoteContent, Lookup, RemoteSequenceIndex); + if (!AbortFlag) { - ZEN_TRACE_CPU("HashSequence"); - const IoHash VerifyChunkHash = IoHash::HashBuffer( - IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash))); - if (VerifyChunkHash != SequenceRawHash) - { - throw std::runtime_error(fmt::format("Written chunk sequence {} hash does not match expected hash {}", - VerifyChunkHash, - SequenceRawHash)); - } + const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + FinalizeChunkSequence(TargetFolder, SequenceRawHash); } - FinalizeChunkSequence(TargetFolder, SequenceRawHash); } }, Work.DefaultErrorFunction()); } const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0]; + VerifySequence(TargetFolder, RemoteContent, Lookup, RemoteSequenceIndex); const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - { - ZEN_TRACE_CPU("HashSequence"); - const IoHash VerifyChunkHash = - IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash))); - if (VerifyChunkHash != SequenceRawHash) - { - throw std::runtime_error( - fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash)); - } - } FinalizeChunkSequence(TargetFolder, SequenceRawHash); } @@ -3985,8 +4067,7 @@ namespace { const BlockWriteOps& Ops, ParallellWork& Work, WorkerThreadPool& VerifyPool, - DiskStatistics& DiskStats, - WriteChunkStatistics& WriteChunkStats) + DiskStatistics& DiskStats) { ZEN_TRACE_CPU("WriteBlockChunkOps"); { @@ -4017,12 +4098,6 @@ namespace { FileOffset, RemoteContent.RawSizes[PathIndex]); } - WriteChunkStats.ChunkCountWritten += gsl::narrow<uint32_t>(Ops.ChunkBuffers.size()); - WriteChunkStats.ChunkBytesWritten += - std::accumulate(Ops.ChunkBuffers.begin(), - Ops.ChunkBuffers.end(), - uint64_t(0), - [](uint64_t Current, const CompositeBuffer& Buffer) -> uint64_t { return Current + Buffer.GetSize(); }); } if (!AbortFlag) { @@ -4036,7 +4111,7 @@ namespace { CompletedChunkSequences.push_back(RemoteSequenceIndex); } } - VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, CompletedChunkSequences, Work, VerifyPool); + VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, Lookup, CompletedChunkSequences, Work, VerifyPool); } } @@ -4162,8 +4237,7 @@ namespace { CompositeBuffer&& BlockBuffer, const ChunkedContentLookup& Lookup, std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, - DiskStatistics& DiskStats, - WriteChunkStatistics& WriteChunkStats) + DiskStatistics& DiskStats) { ZEN_TRACE_CPU("WriteBlockToDisk"); @@ -4197,8 +4271,7 @@ namespace { Ops, Work, VerifyPool, - DiskStats, - WriteChunkStats); + DiskStats); return true; } return false; @@ -4222,8 +4295,7 @@ namespace { Ops, Work, VerifyPool, - DiskStats, - WriteChunkStats); + DiskStats); return true; } return false; @@ -4240,8 +4312,7 @@ namespace { uint32_t LastIncludedBlockChunkIndex, const ChunkedContentLookup& Lookup, std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, - DiskStatistics& DiskStats, - WriteChunkStatistics& WriteChunkStats) + DiskStatistics& DiskStats) { ZEN_TRACE_CPU("WritePartialBlockToDisk"); @@ -4267,8 +4338,7 @@ namespace { Ops, Work, VerifyPool, - DiskStats, - WriteChunkStats); + DiskStats); return true; } else @@ -4355,8 +4425,7 @@ namespace { void StreamDecompress(const std::filesystem::path& CacheFolderPath, const IoHash& SequenceRawHash, CompositeBuffer&& CompressedPart, - DiskStatistics& DiskStats, - WriteChunkStatistics& WriteChunkStats) + DiskStatistics& DiskStats) { ZEN_TRACE_CPU("StreamDecompress"); const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash); @@ -4390,7 +4459,6 @@ namespace { DiskStats.ReadByteCount += SourceSize; if (!AbortFlag) { - WriteChunkStats.ChunkBytesWritten += RangeBuffer.GetSize(); DecompressedTemp.Write(RangeBuffer, Offset); for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) { @@ -4424,7 +4492,7 @@ namespace { throw std::runtime_error( fmt::format("Failed moving temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message())); } - WriteChunkStats.ChunkCountWritten++; + // WriteChunkStats.ChunkCountWritten++; } bool WriteCompressedChunk(const std::filesystem::path& TargetFolder, @@ -4433,8 +4501,7 @@ namespace { const IoHash& ChunkHash, const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, IoBuffer&& CompressedPart, - DiskStatistics& DiskStats, - WriteChunkStatistics& WriteChunkStats) + DiskStatistics& DiskStats) { auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end()); @@ -4444,7 +4511,7 @@ namespace { { const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex; const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]; - StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), DiskStats, WriteChunkStats); + StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), DiskStats); } else { @@ -4459,8 +4526,6 @@ namespace { ChunkTargetPtrs, CompositeBuffer(std::move(Chunk)), OpenFileCache); - WriteChunkStats.ChunkCountWritten++; - WriteChunkStats.ChunkBytesWritten += ChunkRawSize; return true; } } @@ -4479,8 +4544,7 @@ namespace { std::atomic<uint64_t>& WritePartsComplete, const uint64_t TotalPartWriteCount, FilteredRate& FilteredWrittenBytesPerSecond, - DiskStatistics& DiskStats, - WriteChunkStatistics& WriteChunkStats) + DiskStatistics& DiskStats) { ZEN_TRACE_CPU("AsyncWriteDownloadedChunk"); @@ -4527,7 +4591,7 @@ namespace { } Work.ScheduleWork( - WritePool, // GetSyncWorkerPool(),// + WritePool, [&, SequenceIndexChunksLeftToWriteCounters, CompressedChunkPath, @@ -4565,8 +4629,7 @@ namespace { ChunkHash, ChunkTargetPtrs, std::move(CompressedPart), - DiskStats, - WriteChunkStats); + DiskStats); if (!AbortFlag) { WritePartsComplete++; @@ -4581,7 +4644,12 @@ namespace { CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); if (NeedHashVerify) { - VerifyAndCompleteChunkSequencesAsync(TargetFolder, RemoteContent, CompletedSequences, Work, WritePool); + VerifyAndCompleteChunkSequencesAsync(TargetFolder, + RemoteContent, + RemoteLookup, + CompletedSequences, + Work, + WritePool); } else { @@ -4593,7 +4661,7 @@ namespace { Work.DefaultErrorFunction()); }; - void UpdateFolder(BuildStorage& Storage, + void UpdateFolder(StorageInstance& Storage, const Oid& BuildId, const std::filesystem::path& Path, const std::uint64_t LargeAttachmentSize, @@ -4667,8 +4735,8 @@ namespace { if (SequenceSize == CacheDirContent.FileSizes[Index]) { CachedSequenceHashesFound.insert({FileHash, SequenceIndex}); - CacheMappingStats.CacheSequenceHashesCount += SequenceSize; - CacheMappingStats.CacheSequenceHashesByteCount++; + CacheMappingStats.CacheSequenceHashesCount++; + CacheMappingStats.CacheSequenceHashesByteCount += SequenceSize; continue; } } @@ -4869,21 +4937,17 @@ namespace { NiceBytes(CacheMappingStats.LocalChunkMatchingRemoteByteCount)); } - uint32_t ChunkCountToWrite = 0; + uint64_t BytesToWrite = 0; + for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++) { - if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) + uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex); + if (ChunkWriteCount > 0) { - ChunkCountToWrite++; - } - else - { - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = - GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex); - if (!ChunkTargetPtrs.empty()) + BytesToWrite += RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] * ChunkWriteCount; + if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) { RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex] = true; - ChunkCountToWrite++; } } } @@ -4900,8 +4964,8 @@ namespace { FilteredRate FilteredDownloadedBytesPerSecond; FilteredRate FilteredWrittenBytesPerSecond; - WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // - WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // + WorkerThreadPool& NetworkPool = SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst); + WorkerThreadPool& WritePool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst); ProgressBar WriteProgressBar(UsePlainProgress); ParallellWork Work(AbortFlag); @@ -4922,7 +4986,7 @@ namespace { const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second; if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) { - ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash); + ZEN_CONSOLE_VERBOSE("Skipping chunk {} due to cache reuse", ChunkHash); continue; } bool NeedsCopy = true; @@ -4933,7 +4997,7 @@ namespace { if (ChunkTargetPtrs.empty()) { - ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash); + ZEN_CONSOLE_VERBOSE("Skipping chunk {} due to cache reuse", ChunkHash); } else { @@ -5025,6 +5089,10 @@ namespace { uint32_t CurrentOffset = gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + const uint64_t TotalBlockSize = std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), + BlockDescription.ChunkCompressedLengths.end(), + std::uint64_t(CurrentOffset)); + BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex}; while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() && ChunkBlockIndex < BlockDescription.ChunkRawHashes.size()) @@ -5064,6 +5132,7 @@ namespace { } ZEN_ASSERT(!BlockRanges.empty()); + std::vector<BlockRangeDescriptor> CollapsedBlockRanges; auto It = BlockRanges.begin(); CollapsedBlockRanges.push_back(*It++); @@ -5085,10 +5154,87 @@ namespace { ++It; } - TotalRequestCount += CollapsedBlockRanges.size(); - TotalPartWriteCount += CollapsedBlockRanges.size(); + const std::uint64_t WantedSize = std::accumulate( + CollapsedBlockRanges.begin(), + CollapsedBlockRanges.end(), + uint64_t(0), + [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); + ZEN_ASSERT(WantedSize <= TotalBlockSize); + if (WantedSize > ((TotalBlockSize * 95) / 100)) + { + ZEN_CONSOLE_VERBOSE("Using more than 95% ({}) of block {} ({}), requesting full block", + NiceBytes(WantedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize)); + TotalRequestCount++; + TotalPartWriteCount++; + + FullBlockWorks.push_back(BlockIndex); + } + else if ((WantedSize > ((TotalBlockSize * 9) / 10)) && CollapsedBlockRanges.size() > 1) + { + ZEN_CONSOLE_VERBOSE("Using more than 90% ({}) of block {} ({}) using {} requests, requesting full block", + NiceBytes(WantedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + CollapsedBlockRanges.size()); + TotalRequestCount++; + TotalPartWriteCount++; + + FullBlockWorks.push_back(BlockIndex); + } + else if ((WantedSize > ((TotalBlockSize * 8) / 10)) && (CollapsedBlockRanges.size() > 16)) + { + ZEN_CONSOLE_VERBOSE("Using more than 80% ({}) of block {} ({}) using {} requests, requesting full block", + NiceBytes(WantedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + CollapsedBlockRanges.size()); + TotalRequestCount++; + TotalPartWriteCount++; + + FullBlockWorks.push_back(BlockIndex); + } + else if ((WantedSize > ((TotalBlockSize * 7) / 10)) && (CollapsedBlockRanges.size() > 48)) + { + ZEN_CONSOLE_VERBOSE("Using more than 70% ({}) of block {} ({}) using {} requests, requesting full block", + NiceBytes(WantedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + CollapsedBlockRanges.size()); + TotalRequestCount++; + TotalPartWriteCount++; + + FullBlockWorks.push_back(BlockIndex); + } + else if ((WantedSize > ((TotalBlockSize * 6) / 10)) && (CollapsedBlockRanges.size() > 64)) + { + ZEN_CONSOLE_VERBOSE("Using more than 60% ({}) of block {} ({}) using {} requests, requesting full block", + NiceBytes(WantedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + CollapsedBlockRanges.size()); + TotalRequestCount++; + TotalPartWriteCount++; + + FullBlockWorks.push_back(BlockIndex); + } + else + { + if (WantedSize > ((TotalBlockSize * 5) / 10)) + { + ZEN_CONSOLE_VERBOSE("Using {}% ({}) of block {} ({}) using {} requests, requesting partial block", + (WantedSize * 100) / TotalBlockSize, + NiceBytes(WantedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + CollapsedBlockRanges.size()); + } + TotalRequestCount += CollapsedBlockRanges.size(); + TotalPartWriteCount += CollapsedBlockRanges.size(); - BlockRangeWorks.insert(BlockRangeWorks.end(), CollapsedBlockRanges.begin(), CollapsedBlockRanges.end()); + BlockRangeWorks.insert(BlockRangeWorks.end(), CollapsedBlockRanges.begin(), CollapsedBlockRanges.end()); + } } else { @@ -5101,10 +5247,69 @@ namespace { } else { - ZEN_DEBUG("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash); + ZEN_CONSOLE_VERBOSE("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash); } } + struct BlobsExistsResult + { + tsl::robin_set<IoHash> ExistingBlobs; + uint64_t ElapsedTimeMs = 0; + }; + + BlobsExistsResult ExistsResult; + + if (Storage.BuildCacheStorage) + { + ZEN_TRACE_CPU("BlobCacheExistCheck"); + Stopwatch Timer; + + tsl::robin_set<IoHash> BlobHashesSet; + + BlobHashesSet.reserve(LooseChunkHashWorks.size() + FullBlockWorks.size()); + for (LooseChunkHashWorkData& LooseChunkHashWork : LooseChunkHashWorks) + { + BlobHashesSet.insert(RemoteContent.ChunkedContent.ChunkHashes[LooseChunkHashWork.RemoteChunkIndex]); + } + for (const BlockRangeDescriptor& BlockRange : BlockRangeWorks) + { + const uint32_t BlockIndex = BlockRange.BlockIndex; + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + BlobHashesSet.insert(BlockDescription.BlockHash); + } + for (uint32_t BlockIndex : FullBlockWorks) + { + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + BlobHashesSet.insert(BlockDescription.BlockHash); + } + + if (!BlobHashesSet.empty()) + { + const std::vector<IoHash> BlobHashes(BlobHashesSet.begin(), BlobHashesSet.end()); + const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult = + Storage.BuildCacheStorage->BlobsExists(BuildId, BlobHashes); + + if (CacheExistsResult.size() == BlobHashes.size()) + { + ExistsResult.ExistingBlobs.reserve(CacheExistsResult.size()); + for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++) + { + if (CacheExistsResult[BlobIndex].HasBody) + { + ExistsResult.ExistingBlobs.insert(BlobHashes[BlobIndex]); + } + } + } + ExistsResult.ElapsedTimeMs = Timer.GetElapsedTimeMs(); + if (!ExistsResult.ExistingBlobs.empty()) + { + ZEN_CONSOLE("Found {} out of {} needed blobs in remote cache in {}", + ExistsResult.ExistingBlobs.size(), + BlobHashes.size(), + NiceTimeSpanMs(ExistsResult.ElapsedTimeMs)); + } + } + } for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++) { if (AbortFlag) @@ -5119,7 +5324,7 @@ namespace { const uint32_t RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex; Work.ScheduleWork( - WritePool, // NetworkPool, // GetSyncWorkerPool(),// + WritePool, [&, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) mutable { if (!AbortFlag) { @@ -5152,151 +5357,202 @@ namespace { } } } - if (!ExistingCompressedChunkPath.empty()) + if (!AbortFlag) + { - Work.ScheduleWork( - WritePool, // WritePool, GetSyncWorkerPool() - [&Path, - &RemoteContent, - &RemoteLookup, - &CacheFolderPath, - &SequenceIndexChunksLeftToWriteCounters, - &Work, - &WritePool, - &DiskStats, - &WriteChunkStats, - &WritePartsComplete, - &TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - RemoteChunkIndex, - ChunkTargetPtrs, - CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded"); + if (!ExistingCompressedChunkPath.empty()) + { + Work.ScheduleWork( + WritePool, + [&Path, + &RemoteContent, + &RemoteLookup, + &CacheFolderPath, + &SequenceIndexChunksLeftToWriteCounters, + &Work, + &WritePool, + &DiskStats, + &WriteChunkStats, + &WritePartsComplete, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + RemoteChunkIndex, + ChunkTargetPtrs, + CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded"); - FilteredWrittenBytesPerSecond.Start(); + FilteredWrittenBytesPerSecond.Start(); - const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); - if (!CompressedPart) - { - throw std::runtime_error(fmt::format("Could not open dowloaded compressed chunk {} from {}", - ChunkHash, - CompressedChunkPath)); - } + IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); + if (!CompressedPart) + { + throw std::runtime_error( + fmt::format("Could not open dowloaded compressed chunk {} from {}", + ChunkHash, + CompressedChunkPath)); + } - std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName; - bool NeedHashVerify = WriteCompressedChunk(TargetFolder, - RemoteContent, - RemoteLookup, - ChunkHash, - ChunkTargetPtrs, - std::move(CompressedPart), - DiskStats, - WriteChunkStats); - WriteChunkStats.ChunkCountWritten++; - WriteChunkStats.ChunkBytesWritten += - RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex]; - WritePartsComplete++; + std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName; + bool NeedHashVerify = WriteCompressedChunk(TargetFolder, + RemoteContent, + RemoteLookup, + ChunkHash, + ChunkTargetPtrs, + std::move(CompressedPart), + DiskStats); + WritePartsComplete++; - if (!AbortFlag) - { - if (WritePartsComplete == TotalPartWriteCount) + if (!AbortFlag) { - FilteredWrittenBytesPerSecond.Stop(); - } + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } - std::filesystem::remove(CompressedChunkPath); + std::filesystem::remove(CompressedChunkPath); - std::vector<uint32_t> CompletedSequences = - CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); - if (NeedHashVerify) + std::vector<uint32_t> CompletedSequences = + CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); + if (NeedHashVerify) + { + VerifyAndCompleteChunkSequencesAsync(TargetFolder, + RemoteContent, + RemoteLookup, + CompletedSequences, + Work, + WritePool); + } + else + { + FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences); + } + } + } + }, + Work.DefaultErrorFunction()); + } + else + { + Work.ScheduleWork( + NetworkPool, + [&Path, + &Storage, + BuildId, + &RemoteContent, + &RemoteLookup, + &ExistsResult, + &SequenceIndexChunksLeftToWriteCounters, + &Work, + &WritePool, + &NetworkPool, + &DiskStats, + &WriteChunkStats, + &WritePartsComplete, + TotalPartWriteCount, + TotalRequestCount, + &FilteredDownloadedBytesPerSecond, + &FilteredWrittenBytesPerSecond, + LargeAttachmentSize, + PreferredMultipartChunkSize, + RemoteChunkIndex, + ChunkTargetPtrs, + &DownloadStats](std::atomic<bool>&) mutable { + if (!AbortFlag) + { + FilteredDownloadedBytesPerSecond.Start(); + const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) { - VerifyAndCompleteChunkSequencesAsync(TargetFolder, - RemoteContent, - CompletedSequences, - Work, - WritePool); + ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk"); + DownloadLargeBlob(*Storage.BuildStorage, + Path / ZenTempDownloadFolderName, + BuildId, + ChunkHash, + PreferredMultipartChunkSize, + Work, + NetworkPool, + DownloadStats, + [&, RemoteChunkIndex, ChunkTargetPtrs](IoBuffer&& Payload) mutable { + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + if (!AbortFlag) + { + AsyncWriteDownloadedChunk( + Path, + RemoteContent, + RemoteLookup, + RemoteChunkIndex, + std::move(ChunkTargetPtrs), + Work, + WritePool, + std::move(Payload), + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + TotalPartWriteCount, + FilteredWrittenBytesPerSecond, + DiskStats); + } + }); } else { - FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences); + ZEN_TRACE_CPU("UpdateFolder_GetChunk"); + IoBuffer BuildBlob; + if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash)) + { + BuildBlob = Storage.BuildCacheStorage->GetBuildBlob(BuildId, ChunkHash); + } + if (!BuildBlob) + { + BuildBlob = Storage.BuildStorage->GetBuildBlob(BuildId, ChunkHash); + if (BuildBlob && Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBuildBlob( + BuildId, + ChunkHash, + BuildBlob.GetContentType(), + CompositeBuffer(SharedBuffer(BuildBlob))); + } + } + if (!BuildBlob) + { + throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); + } + if (!AbortFlag) + { + uint64_t BlobSize = BuildBlob.GetSize(); + DownloadStats.DownloadedChunkCount++; + DownloadStats.DownloadedChunkByteCount += BlobSize; + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + AsyncWriteDownloadedChunk(Path, + RemoteContent, + RemoteLookup, + RemoteChunkIndex, + std::move(ChunkTargetPtrs), + Work, + WritePool, + std::move(BuildBlob), + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + TotalPartWriteCount, + FilteredWrittenBytesPerSecond, + DiskStats); + } } } - } - }, - Work.DefaultErrorFunction()); - } - else - { - FilteredDownloadedBytesPerSecond.Start(); - const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) - { - ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk"); - DownloadLargeBlob(Storage, - Path / ZenTempDownloadFolderName, - BuildId, - ChunkHash, - PreferredMultipartChunkSize, - Work, - NetworkPool, - DownloadStats, - [&, RemoteChunkIndex, ChunkTargetPtrs](IoBuffer&& Payload) mutable { - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - AsyncWriteDownloadedChunk(Path, - RemoteContent, - RemoteLookup, - RemoteChunkIndex, - std::move(ChunkTargetPtrs), - Work, - WritePool, - std::move(Payload), - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - TotalPartWriteCount, - FilteredWrittenBytesPerSecond, - DiskStats, - WriteChunkStats); - }); - } - else - { - ZEN_TRACE_CPU("UpdateFolder_GetChunk"); - - IoBuffer BuildBlob = Storage.GetBuildBlob(BuildId, ChunkHash); - if (!BuildBlob) - { - throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); - } - uint64_t BlobSize = BuildBlob.GetSize(); - DownloadStats.DownloadedChunkCount++; - DownloadStats.DownloadedChunkByteCount += BlobSize; - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - AsyncWriteDownloadedChunk(Path, - RemoteContent, - RemoteLookup, - RemoteChunkIndex, - std::move(ChunkTargetPtrs), - Work, - WritePool, - std::move(BuildBlob), - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - TotalPartWriteCount, - FilteredWrittenBytesPerSecond, - DiskStats, - WriteChunkStats); + }, + Work.DefaultErrorFunction()); } } } @@ -5312,7 +5568,7 @@ namespace { } Work.ScheduleWork( - WritePool, // GetSyncWorkerPool(),// + WritePool, [&, CopyDataIndex](std::atomic<bool>&) { if (!AbortFlag) { @@ -5439,16 +5695,6 @@ namespace { ChunkSource, Op.Target->Offset, RemoteContent.RawSizes[RemotePathIndex]); - for (size_t WrittenOpIndex = WriteOpIndex; WrittenOpIndex < WriteOpIndex + WriteCount; WrittenOpIndex++) - { - const WriteOp& WrittenOp = WriteOps[WrittenOpIndex]; - if (ChunkIndexesWritten.insert(WrittenOp.ChunkIndex).second) - { - WriteChunkStats.ChunkCountWritten++; - WriteChunkStats.ChunkBytesWritten += - RemoteContent.ChunkedContent.ChunkRawSizes[WrittenOp.ChunkIndex]; - } - } CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes? @@ -5469,10 +5715,13 @@ namespace { } VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, + RemoteLookup, CompletedChunkSequences, Work, WritePool); - ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]); + ZEN_CONSOLE_VERBOSE("Copied {} from {}", + NiceBytes(CacheLocalFileBytesRead), + LocalContent.Paths[LocalPathIndex]); } WritePartsComplete++; if (WritePartsComplete == TotalPartWriteCount) @@ -5492,7 +5741,7 @@ namespace { } Work.ScheduleWork( - WritePool, // GetSyncWorkerPool(), // WritePool, + WritePool, [&, BlockIndex](std::atomic<bool>&) mutable { if (!AbortFlag) { @@ -5509,27 +5758,29 @@ namespace { fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath)); } - if (!WriteBlockToDisk(CacheFolderPath, - RemoteContent, - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - WritePool, - CompositeBuffer(std::move(BlockBuffer)), - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags, - DiskStats, - WriteChunkStats)) - { - std::error_code DummyEc; - std::filesystem::remove(BlockChunkPath, DummyEc); - throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); - } - WritePartsComplete++; - std::filesystem::remove(BlockChunkPath); - if (WritePartsComplete == TotalPartWriteCount) + if (!AbortFlag) { - FilteredWrittenBytesPerSecond.Stop(); + if (!WriteBlockToDisk(CacheFolderPath, + RemoteContent, + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + WritePool, + CompositeBuffer(std::move(BlockBuffer)), + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags, + DiskStats)) + { + std::error_code DummyEc; + std::filesystem::remove(BlockChunkPath, DummyEc); + throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); + } + WritePartsComplete++; + std::filesystem::remove(BlockChunkPath); + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } } } }, @@ -5546,7 +5797,7 @@ namespace { ZEN_ASSERT(BlockRange.BlockIndex != (uint32_t)-1); const uint32_t BlockIndex = BlockRange.BlockIndex; Work.ScheduleWork( - NetworkPool, // NetworkPool, // GetSyncWorkerPool() + NetworkPool, [&, BlockIndex, BlockRange](std::atomic<bool>&) { if (!AbortFlag) { @@ -5555,131 +5806,148 @@ namespace { const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; FilteredDownloadedBytesPerSecond.Start(); - IoBuffer BlockBuffer = - Storage.GetBuildBlob(BuildId, BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); + IoBuffer BlockBuffer; + if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash)) + { + BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); + } if (!BlockBuffer) { - throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); + BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); } - uint64_t BlockSize = BlockBuffer.GetSize(); - DownloadStats.DownloadedBlockCount++; - DownloadStats.DownloadedBlockByteCount += BlockSize; - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + if (!BlockBuffer) { - FilteredDownloadedBytesPerSecond.Stop(); + throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); } - - std::filesystem::path BlockChunkPath; - - // Check if the dowloaded block is file based and we can move it directly without rewriting it + if (!AbortFlag) { - IoBufferFileReference FileRef; - if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && - (FileRef.FileChunkSize == BlockSize)) + uint64_t BlockSize = BlockBuffer.GetSize(); + DownloadStats.DownloadedBlockCount++; + DownloadStats.DownloadedBlockByteCount += BlockSize; + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) { - ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); + FilteredDownloadedBytesPerSecond.Stop(); + } - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) + std::filesystem::path BlockChunkPath; + + // Check if the dowloaded block is file based and we can move it directly without rewriting it + { + IoBufferFileReference FileRef; + if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && + (FileRef.FileChunkSize == BlockSize)) { - BlockBuffer.SetDeleteOnClose(false); - BlockBuffer = {}; - BlockChunkPath = Path / ZenTempBlockFolderName / - fmt::format("{}_{:x}_{:x}", - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeLength); - std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec); - if (Ec) + ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); + + std::error_code Ec; + std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); + if (!Ec) { - BlockChunkPath = std::filesystem::path{}; + BlockBuffer.SetDeleteOnClose(false); + BlockBuffer = {}; + BlockChunkPath = Path / ZenTempBlockFolderName / + fmt::format("{}_{:x}_{:x}", + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); + std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec); + if (Ec) + { + BlockChunkPath = std::filesystem::path{}; - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); - BlockBuffer.SetDeleteOnClose(true); + // Re-open the temp file again + BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); + BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); + BlockBuffer.SetDeleteOnClose(true); + } } } } - } - if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) - { - ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); - // Could not be moved and rather large, lets store it on disk - BlockChunkPath = - Path / ZenTempBlockFolderName / - fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); - TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); - BlockBuffer = {}; - } + if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) + { + ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); + // Could not be moved and rather large, lets store it on disk + BlockChunkPath = Path / ZenTempBlockFolderName / + fmt::format("{}_{:x}_{:x}", + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); + TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); + BlockBuffer = {}; + } - if (!AbortFlag) - { - Work.ScheduleWork( - WritePool, // WritePool, // GetSyncWorkerPool(), - [&, BlockIndex, BlockRange, BlockChunkPath, BlockPartialBuffer = std::move(BlockBuffer)]( - std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_WritePartialBlock"); + if (!AbortFlag) + { + Work.ScheduleWork( + WritePool, + [&, BlockIndex, BlockRange, BlockChunkPath, BlockPartialBuffer = std::move(BlockBuffer)]( + std::atomic<bool>&) mutable { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_WritePartialBlock"); - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - if (BlockChunkPath.empty()) - { - ZEN_ASSERT(BlockPartialBuffer); - } - else - { - ZEN_ASSERT(!BlockPartialBuffer); - BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockPartialBuffer) + if (BlockChunkPath.empty()) { - throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}", - BlockDescription.BlockHash, - BlockChunkPath)); + ZEN_ASSERT(BlockPartialBuffer); + } + else + { + ZEN_ASSERT(!BlockPartialBuffer); + BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockPartialBuffer) + { + throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}", + BlockDescription.BlockHash, + BlockChunkPath)); + } } - } - FilteredWrittenBytesPerSecond.Start(); - - if (!WritePartialBlockToDisk( - CacheFolderPath, - RemoteContent, - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - WritePool, - CompositeBuffer(std::move(BlockPartialBuffer)), - BlockRange.ChunkBlockIndexStart, - BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags, - DiskStats, - WriteChunkStats)) - { - std::error_code DummyEc; - std::filesystem::remove(BlockChunkPath, DummyEc); - throw std::runtime_error( - fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); - } + FilteredWrittenBytesPerSecond.Start(); + + if (!WritePartialBlockToDisk( + CacheFolderPath, + RemoteContent, + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + WritePool, + CompositeBuffer(std::move(BlockPartialBuffer)), + BlockRange.ChunkBlockIndexStart, + BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags, + DiskStats)) + { + std::error_code DummyEc; + std::filesystem::remove(BlockChunkPath, DummyEc); + throw std::runtime_error( + fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); + } - if (!BlockChunkPath.empty()) - { - std::filesystem::remove(BlockChunkPath); - } + if (!BlockChunkPath.empty()) + { + std::filesystem::remove(BlockChunkPath); + } - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } } - } - }, - Work.DefaultErrorFunction()); + }, + Work.DefaultErrorFunction()); + } } } }, @@ -5693,7 +5961,7 @@ namespace { break; } Work.ScheduleWork( - NetworkPool, // GetSyncWorkerPool(), // NetworkPool, + NetworkPool, [&, BlockIndex](std::atomic<bool>&) { if (!AbortFlag) { @@ -5702,133 +5970,152 @@ namespace { const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; FilteredDownloadedBytesPerSecond.Start(); - IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescription.BlockHash); + + IoBuffer BlockBuffer; + if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash)) + { + BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash); + } if (!BlockBuffer) { - throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); + BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash); + if (BlockBuffer && Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBuildBlob(BuildId, + BlockDescription.BlockHash, + BlockBuffer.GetContentType(), + CompositeBuffer(SharedBuffer(BlockBuffer))); + } } - uint64_t BlockSize = BlockBuffer.GetSize(); - DownloadStats.DownloadedBlockCount++; - DownloadStats.DownloadedBlockByteCount += BlockSize; - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + if (!BlockBuffer) { - FilteredDownloadedBytesPerSecond.Stop(); + throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); } + if (!AbortFlag) + { + uint64_t BlockSize = BlockBuffer.GetSize(); + DownloadStats.DownloadedBlockCount++; + DownloadStats.DownloadedBlockByteCount += BlockSize; + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } - std::filesystem::path BlockChunkPath; + std::filesystem::path BlockChunkPath; - // Check if the dowloaded block is file based and we can move it directly without rewriting it - { - IoBufferFileReference FileRef; - if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && - (FileRef.FileChunkSize == BlockSize)) + // Check if the dowloaded block is file based and we can move it directly without rewriting it { - ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) + IoBufferFileReference FileRef; + if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && + (FileRef.FileChunkSize == BlockSize)) { - BlockBuffer.SetDeleteOnClose(false); - BlockBuffer = {}; - BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); - std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec); - if (Ec) + ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); + std::error_code Ec; + std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); + if (!Ec) { - BlockChunkPath = std::filesystem::path{}; + BlockBuffer.SetDeleteOnClose(false); + BlockBuffer = {}; + BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); + std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec); + if (Ec) + { + BlockChunkPath = std::filesystem::path{}; - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); - BlockBuffer.SetDeleteOnClose(true); + // Re-open the temp file again + BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); + BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); + BlockBuffer.SetDeleteOnClose(true); + } } } } - } - if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) - { - ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); - // Could not be moved and rather large, lets store it on disk - BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); - TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); - BlockBuffer = {}; - } + if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) + { + ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); + // Could not be moved and rather large, lets store it on disk + BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); + TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); + BlockBuffer = {}; + } - if (!AbortFlag) - { - Work.ScheduleWork( - WritePool, // WritePool, GetSyncWorkerPool() - [&Work, - &WritePool, - &RemoteContent, - &RemoteLookup, - CacheFolderPath, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - BlockIndex, - &BlockDescriptions, - &WriteChunkStats, - &DiskStats, - &WritePartsComplete, - &TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - BlockChunkPath, - BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock"); + if (!AbortFlag) + { + Work.ScheduleWork( + WritePool, + [&Work, + &WritePool, + &RemoteContent, + &RemoteLookup, + CacheFolderPath, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + BlockIndex, + &BlockDescriptions, + &WriteChunkStats, + &DiskStats, + &WritePartsComplete, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + BlockChunkPath, + BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock"); - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - if (BlockChunkPath.empty()) - { - ZEN_ASSERT(BlockBuffer); - } - else - { - ZEN_ASSERT(!BlockBuffer); - BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockBuffer) + if (BlockChunkPath.empty()) { - throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}", - BlockDescription.BlockHash, - BlockChunkPath)); + ZEN_ASSERT(BlockBuffer); + } + else + { + ZEN_ASSERT(!BlockBuffer); + BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}", + BlockDescription.BlockHash, + BlockChunkPath)); + } } - } - FilteredWrittenBytesPerSecond.Start(); - if (!WriteBlockToDisk(CacheFolderPath, - RemoteContent, - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - WritePool, - CompositeBuffer(std::move(BlockBuffer)), - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags, - DiskStats, - WriteChunkStats)) - { - std::error_code DummyEc; - std::filesystem::remove(BlockChunkPath, DummyEc); - throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); - } + FilteredWrittenBytesPerSecond.Start(); + if (!WriteBlockToDisk(CacheFolderPath, + RemoteContent, + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + WritePool, + CompositeBuffer(std::move(BlockBuffer)), + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags, + DiskStats)) + { + std::error_code DummyEc; + std::filesystem::remove(BlockChunkPath, DummyEc); + throw std::runtime_error( + fmt::format("Block {} is malformed", BlockDescription.BlockHash)); + } - if (!BlockChunkPath.empty()) - { - std::filesystem::remove(BlockChunkPath); - } + if (!BlockChunkPath.empty()) + { + std::filesystem::remove(BlockChunkPath); + } - WritePartsComplete++; + WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } } - } - }, - Work.DefaultErrorFunction()); + }, + Work.DefaultErrorFunction()); + } } } }, @@ -5840,27 +6127,28 @@ namespace { Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, PendingWork); - ZEN_ASSERT(ChunkCountToWrite >= WriteChunkStats.ChunkCountWritten.load()); uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() + DownloadStats.DownloadedBlockByteCount.load() + +DownloadStats.DownloadedPartialBlockByteCount.load(); FilteredWrittenBytesPerSecond.Update(DiskStats.WriteByteCount.load()); FilteredDownloadedBytesPerSecond.Update(DownloadedBytes); - std::string Details = fmt::format("{}/{} ({} {}bits/s) downloaded. {}/{} ({} {}B/s) written.", + std::string DownloadRateString = + (DownloadStats.RequestsCompleteCount == TotalRequestCount) + ? "" + : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8)); + std::string Details = fmt::format("{}/{} ({}{}) downloaded. {}/{} ({}B/s) written.", DownloadStats.RequestsCompleteCount.load(), TotalRequestCount, NiceBytes(DownloadedBytes), - NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8), - WriteChunkStats.ChunkCountWritten.load(), - ChunkCountToWrite, + DownloadRateString, NiceBytes(DiskStats.WriteByteCount.load()), + NiceBytes(BytesToWrite), NiceNum(FilteredWrittenBytesPerSecond.GetCurrent())); - WriteProgressBar.UpdateState( - {.Task = "Writing chunks ", - .Details = Details, - .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite), - .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - WriteChunkStats.ChunkCountWritten.load())}, - false); + WriteProgressBar.UpdateState({.Task = "Writing chunks ", + .Details = Details, + .TotalCount = gsl::narrow<uint64_t>(BytesToWrite), + .RemainingCount = gsl::narrow<uint64_t>(BytesToWrite - DiskStats.WriteByteCount.load())}, + false); }); } @@ -5981,7 +6269,7 @@ namespace { ZEN_TRACE_CPU("UpdateFolder_FinalizeTree"); Stopwatch Timer; - WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // + WorkerThreadPool& WritePool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst); ProgressBar RebuildProgressBar(UsePlainProgress); ParallellWork Work(AbortFlag); @@ -6019,7 +6307,7 @@ namespace { } Work.ScheduleWork( - WritePool, // GetSyncWorkerPool(),// + WritePool, [&, BaseTargetOffset = TargetOffset, TargetCount](std::atomic<bool>&) { if (!AbortFlag) { @@ -6070,6 +6358,10 @@ namespace { TargetsComplete++; while (TargetOffset < (BaseTargetOffset + TargetCount)) { + if (AbortFlag) + { + break; + } ZEN_TRACE_CPU("FinalizeTree_Copy"); ZEN_ASSERT(Targets[TargetOffset].first == RawHash); @@ -6140,17 +6432,15 @@ namespace { std::vector<std::pair<Oid, std::string>> Result; { Stopwatch GetBuildTimer; - - std::vector<std::pair<Oid, std::string>> AvailableParts; - - CbObject BuildObject = Storage.GetBuild(BuildId); - - ZEN_CONSOLE("GetBuild took {}. Name: '{}', Payload size: {}", + CbObject BuildObject = Storage.GetBuild(BuildId); + ZEN_CONSOLE("GetBuild took {}. Name: '{}' ({}, {}), Payload size: {}", NiceTimeSpanMs(GetBuildTimer.GetElapsedTimeMs()), - BuildObject["BuildName"sv].AsString(), + BuildObject["name"sv].AsString(), + BuildObject["type"sv].AsString(), + BuildObject["Configuration"sv].AsString(), NiceBytes(BuildObject.GetSize())); - ZEN_DEBUG("Build object: {}", BuildObject); + ZEN_CONSOLE_VERBOSE("Build object: {}", BuildObject); CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); if (!PartsObject) @@ -6160,6 +6450,8 @@ namespace { OutPreferredMultipartChunkSize = BuildObject["chunkSize"sv].AsUInt64(OutPreferredMultipartChunkSize); + std::vector<std::pair<Oid, std::string>> AvailableParts; + for (CbFieldView PartView : PartsObject) { const std::string BuildPartName = std::string(PartView.GetName()); @@ -6221,7 +6513,7 @@ namespace { return Result; } - ChunkedFolderContent GetRemoteContent(BuildStorage& Storage, + ChunkedFolderContent GetRemoteContent(StorageInstance& Storage, const Oid& BuildId, const std::vector<std::pair<Oid, std::string>>& BuildParts, std::unique_ptr<ChunkingController>& OutChunkController, @@ -6234,7 +6526,7 @@ namespace { Stopwatch GetBuildPartTimer; const Oid BuildPartId = BuildParts[0].first; const std::string_view BuildPartName = BuildParts[0].second; - CbObject BuildPartManifest = Storage.GetBuildPart(BuildId, BuildPartId); + CbObject BuildPartManifest = Storage.BuildStorage->GetBuildPart(BuildId, BuildPartId); ZEN_CONSOLE("GetBuildPart {} ('{}') took {}. Payload size: {}", BuildPartId, BuildPartName, @@ -6248,7 +6540,7 @@ namespace { OutChunkController = CreateChunkingController(ChunkerName, Parameters); } - auto ParseBuildPartManifest = [](BuildStorage& Storage, + auto ParseBuildPartManifest = [](StorageInstance& Storage, const Oid& BuildId, const Oid& BuildPartId, CbObject BuildPartManifest, @@ -6274,12 +6566,103 @@ namespace { // TODO: GetBlockDescriptions for all BlockRawHashes in one go - check for local block descriptions when we cache them - Stopwatch GetBlockMetadataTimer; - OutBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockRawHashes); - ZEN_CONSOLE("GetBlockMetadata for {} took {}. Found {} blocks", - BuildPartId, - NiceTimeSpanMs(GetBlockMetadataTimer.GetElapsedTimeMs()), - OutBlockDescriptions.size()); + { + Stopwatch GetBlockMetadataTimer; + + std::vector<ChunkBlockDescription> UnorderedList; + tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockDescriptionLookup; + if (Storage.BuildCacheStorage) + { + std::vector<CbObject> CacheBlockMetadatas = Storage.BuildCacheStorage->GetBlobMetadatas(BuildId, BlockRawHashes); + UnorderedList.reserve(CacheBlockMetadatas.size()); + for (size_t CacheBlockMetadataIndex = 0; CacheBlockMetadataIndex < CacheBlockMetadatas.size(); + CacheBlockMetadataIndex++) + { + const CbObject& CacheBlockMetadata = CacheBlockMetadatas[CacheBlockMetadataIndex]; + ChunkBlockDescription Description = ParseChunkBlockDescription(CacheBlockMetadata); + if (Description.BlockHash == IoHash::Zero) + { + ZEN_WARN("Unexpected/invalid block metadata received from remote cache, skipping block"); + } + else + { + UnorderedList.emplace_back(std::move(Description)); + } + } + for (size_t DescriptionIndex = 0; DescriptionIndex < UnorderedList.size(); DescriptionIndex++) + { + const ChunkBlockDescription& Description = UnorderedList[DescriptionIndex]; + BlockDescriptionLookup.insert_or_assign(Description.BlockHash, DescriptionIndex); + } + } + + if (UnorderedList.size() < BlockRawHashes.size()) + { + std::vector<IoHash> RemainingBlockHashes; + RemainingBlockHashes.reserve(BlockRawHashes.size() - UnorderedList.size()); + for (const IoHash& BlockRawHash : BlockRawHashes) + { + if (!BlockDescriptionLookup.contains(BlockRawHash)) + { + RemainingBlockHashes.push_back(BlockRawHash); + } + } + CbObject BlockMetadatas = Storage.BuildStorage->GetBlockMetadatas(BuildId, RemainingBlockHashes); + std::vector<ChunkBlockDescription> RemainingList; + { + CbArrayView BlocksArray = BlockMetadatas["blocks"sv].AsArrayView(); + std::vector<IoHash> FoundBlockHashes; + std::vector<CbObject> FoundBlockMetadatas; + for (CbFieldView Block : BlocksArray) + { + ChunkBlockDescription Description = ParseChunkBlockDescription(Block.AsObjectView()); + + if (Description.BlockHash == IoHash::Zero) + { + ZEN_WARN("Unexpected/invalid block metadata received from remote store, skipping block"); + } + else + { + if (Storage.BuildCacheStorage) + { + UniqueBuffer MetaBuffer = UniqueBuffer::Alloc(Block.GetSize()); + Block.CopyTo(MetaBuffer.GetMutableView()); + CbObject BlockMetadata(MetaBuffer.MoveToShared()); + + FoundBlockHashes.push_back(Description.BlockHash); + FoundBlockMetadatas.push_back(BlockMetadata); + } + RemainingList.emplace_back(std::move(Description)); + } + } + if (Storage.BuildCacheStorage && !FoundBlockHashes.empty()) + { + Storage.BuildCacheStorage->PutBlobMetadatas(BuildId, FoundBlockHashes, FoundBlockMetadatas); + } + } + + for (size_t DescriptionIndex = 0; DescriptionIndex < RemainingList.size(); DescriptionIndex++) + { + const ChunkBlockDescription& Description = RemainingList[DescriptionIndex]; + BlockDescriptionLookup.insert_or_assign(Description.BlockHash, UnorderedList.size() + DescriptionIndex); + } + UnorderedList.insert(UnorderedList.end(), RemainingList.begin(), RemainingList.end()); + } + + OutBlockDescriptions.reserve(BlockDescriptionLookup.size()); + for (const IoHash& BlockHash : BlockRawHashes) + { + if (auto It = BlockDescriptionLookup.find(BlockHash); It != BlockDescriptionLookup.end()) + { + OutBlockDescriptions.push_back(std::move(UnorderedList[It->second])); + } + } + + ZEN_CONSOLE("GetBlockMetadata for {} took {}. Found {} blocks", + BuildPartId, + NiceTimeSpanMs(GetBlockMetadataTimer.GetElapsedTimeMs()), + OutBlockDescriptions.size()); + } if (OutBlockDescriptions.size() != BlockRawHashes.size()) { @@ -6292,7 +6675,8 @@ namespace { ZEN_CONSOLE("{} Attemping fallback options.", ErrorDescription); std::vector<ChunkBlockDescription> AugmentedBlockDescriptions; AugmentedBlockDescriptions.reserve(BlockRawHashes.size()); - std::vector<ChunkBlockDescription> FoundBlocks = Storage.FindBlocks(BuildId); + std::vector<ChunkBlockDescription> FoundBlocks = + ParseChunkBlockDescriptionList(Storage.BuildStorage->FindBlocks(BuildId)); for (const IoHash& BlockHash : BlockRawHashes) { @@ -6315,7 +6699,7 @@ namespace { } else { - IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockHash); + IoBuffer BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, BlockHash); if (!BlockBuffer) { throw std::runtime_error(fmt::format("Block {} could not be found", BlockHash)); @@ -6381,7 +6765,7 @@ namespace { const Oid& OverlayBuildPartId = BuildParts[PartIndex].first; const std::string& OverlayBuildPartName = BuildParts[PartIndex].second; Stopwatch GetOverlayBuildPartTimer; - CbObject OverlayBuildPartManifest = Storage.GetBuildPart(BuildId, OverlayBuildPartId); + CbObject OverlayBuildPartManifest = Storage.BuildStorage->GetBuildPart(BuildId, OverlayBuildPartId); ZEN_CONSOLE("GetBuildPart {} ('{}') took {}. Payload size: {}", OverlayBuildPartId, OverlayBuildPartName, @@ -6486,9 +6870,11 @@ namespace { Path, std::move(IsAcceptedFolder), std::move(IsAcceptedFile), - GetMediumWorkerPool(EWorkloadType::Burst), + SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst), UsePlainProgress ? 5000 : 200, - [&](bool, std::ptrdiff_t) { ZEN_DEBUG("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path); }, + [&](bool, std::ptrdiff_t) { + ZEN_CONSOLE_VERBOSE("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path); + }, AbortFlag); if (AbortFlag) { @@ -6557,7 +6943,7 @@ namespace { FilteredBytesHashed.Start(); ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent( ChunkingStats, - GetMediumWorkerPool(EWorkloadType::Burst), + SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst), Path, UpdatedContent, ChunkController, @@ -6609,8 +6995,6 @@ namespace { { LocalContent = DeletePathsFromChunkedContent(LocalContent, DeletedPaths); } - - ZEN_CONSOLE("Using cached local state"); } ZEN_CONSOLE("Read local state in {}", NiceTimeSpanMs(ReadStateTimer.GetElapsedTimeMs())); ScanContent = false; @@ -6636,7 +7020,7 @@ namespace { FilteredBytesHashed.Start(); ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent( ChunkingStats, - GetMediumWorkerPool(EWorkloadType::Burst), + SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst), Path, CurrentLocalFolderContent, ChunkController, @@ -6670,7 +7054,7 @@ namespace { return LocalContent; } - void DownloadFolder(BuildStorage& Storage, + void DownloadFolder(StorageInstance& Storage, const Oid& BuildId, const std::vector<Oid>& BuildPartIds, std::span<const std::string> BuildPartNames, @@ -6694,7 +7078,7 @@ namespace { std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; std::vector<std::pair<Oid, std::string>> AllBuildParts = - ResolveBuildPartNames(Storage, BuildId, BuildPartIds, BuildPartNames, PreferredMultipartChunkSize); + ResolveBuildPartNames(*Storage.BuildStorage, BuildId, BuildPartIds, BuildPartNames, PreferredMultipartChunkSize); std::vector<ChunkedFolderContent> PartContents; @@ -6786,7 +7170,7 @@ namespace { { BuildPartString.Append(fmt::format(" {} ({})", BuildPart.second, BuildPart.first)); } - ZEN_CONSOLE("Downloading build {}, parts:{}", BuildId, BuildPartString.ToView()); + ZEN_CONSOLE("Downloading build {}, parts:{} to '{}'", BuildId, BuildPartString.ToView(), Path); FolderContent LocalFolderState; DiskStatistics DiskStats; @@ -7131,6 +7515,10 @@ BuildsCommand::BuildsCommand() "<jsonmetadata>"); }; + auto AddCacheOptions = [this](cxxopts::Options& Ops) { + Ops.add_option("cache", "", "zen-cache-host", "Host ip and port for zen builds cache", cxxopts::value(m_ZenCacheHost), "<zenhost>"); + }; + auto AddOutputOptions = [this](cxxopts::Options& Ops) { Ops.add_option("output", "", "plain-progress", "Show progress using plain output", cxxopts::value(m_PlainProgress), "<progress>"); Ops.add_option("output", "", "verbose", "Enable verbose console output", cxxopts::value(m_Verbose), "<verbose>"); @@ -7169,6 +7557,7 @@ BuildsCommand::BuildsCommand() AddCloudOptions(m_UploadOptions); AddFileOptions(m_UploadOptions); AddOutputOptions(m_UploadOptions); + AddCacheOptions(m_UploadOptions); m_UploadOptions.add_options()("h,help", "Print help"); m_UploadOptions.add_option("", "l", "local-path", "Root file system folder for build", cxxopts::value(m_Path), "<local-path>"); m_UploadOptions.add_option("", @@ -7232,6 +7621,7 @@ BuildsCommand::BuildsCommand() AddCloudOptions(m_DownloadOptions); AddFileOptions(m_DownloadOptions); AddOutputOptions(m_DownloadOptions); + AddCacheOptions(m_DownloadOptions); m_DownloadOptions.add_options()("h,help", "Print help"); m_DownloadOptions.add_option("", "l", "local-path", "Root file system folder for build", cxxopts::value(m_Path), "<local-path>"); m_DownloadOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>"); @@ -7284,6 +7674,7 @@ BuildsCommand::BuildsCommand() AddCloudOptions(m_TestOptions); AddFileOptions(m_TestOptions); AddOutputOptions(m_TestOptions); + AddCacheOptions(m_TestOptions); m_TestOptions.add_options()("h,help", "Print help"); m_TestOptions.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>"); m_TestOptions.add_option("", @@ -7304,6 +7695,7 @@ BuildsCommand::BuildsCommand() AddCloudOptions(m_FetchBlobOptions); AddFileOptions(m_FetchBlobOptions); AddOutputOptions(m_FetchBlobOptions); + AddCacheOptions(m_FetchBlobOptions); m_FetchBlobOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>"); m_FetchBlobOptions .add_option("", "", "blob-hash", "IoHash in hex form identifying the blob to download", cxxopts::value(m_BlobHash), "<blob-hash>"); @@ -7333,6 +7725,7 @@ BuildsCommand::BuildsCommand() AddCloudOptions(m_MultiTestDownloadOptions); AddFileOptions(m_MultiTestDownloadOptions); AddOutputOptions(m_MultiTestDownloadOptions); + AddCacheOptions(m_MultiTestDownloadOptions); m_MultiTestDownloadOptions .add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>"); m_MultiTestDownloadOptions.add_option("", "", "build-ids", "Build Ids list separated by ','", cxxopts::value(m_BuildIds), "<ids>"); @@ -7385,6 +7778,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) fmt::format("namespace and bucket options are required for url option\n{}", m_Options.help())); } } + else if (m_StoragePath.empty()) + { + throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", m_UploadOptions.help())); + } }; std::unique_ptr<AuthMgr> Auth; @@ -7393,7 +7790,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) auto CreateAuthMgr = [&]() { if (!Auth) { - std::filesystem::path DataRoot = m_SystemRootDir.empty() ? PickDefaultSystemRootDirectory() : StringToPath(m_SystemRootDir); + std::filesystem::path DataRoot = m_SystemRootDir.empty() + ? PickDefaultSystemRootDirectory() + : std::filesystem::absolute(StringToPath(m_SystemRootDir)).make_preferred(); if (m_EncryptionKey.empty()) { @@ -7448,7 +7847,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } else if (!m_AccessTokenPath.empty()) { - std::string ResolvedAccessToken = ReadAccessTokenFromFile(StringToPath(m_AccessTokenPath)); + std::string ResolvedAccessToken = + ReadAccessTokenFromFile(std::filesystem::absolute(StringToPath(m_AccessTokenPath)).make_preferred()); if (!ResolvedAccessToken.empty()) { ClientSettings.AccessTokenProvider = httpclientauth::CreateFromStaticToken(ResolvedAccessToken); @@ -7486,15 +7886,63 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) }; ParseOutputOptions(); + auto CreateBuildStorage = [&](BuildStorage::Statistics& StorageStats, + BuildStorageCache::Statistics& StorageCacheStats, + const std::filesystem::path& TempPath) -> StorageInstance { + ParseStorageOptions(); + + StorageInstance Result; + + if (!m_BuildsUrl.empty()) + { + ParseAuthOptions(); + Result.BuildStorageHttp = std::make_unique<HttpClient>(m_BuildsUrl, ClientSettings); + ZEN_CONSOLE("Using cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}'", + m_BuildsUrl, + Result.BuildStorageHttp->GetSessionId(), + m_Namespace, + m_Bucket); + Result.BuildStorage = + CreateJupiterBuildStorage(Log(), *Result.BuildStorageHttp, StorageStats, m_Namespace, m_Bucket, TempPath / "storage"); + Result.StorageName = ZEN_CLOUD_STORAGE; + } + else if (!m_StoragePath.empty()) + { + std::filesystem::path StoragePath = std::filesystem::absolute(StringToPath(m_StoragePath)).make_preferred(); + ZEN_CONSOLE("Using folder {}", StoragePath); + Result.BuildStorage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); + Result.StorageName = fmt::format("Disk {}", StoragePath.stem()); + } + else + { + throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); + } + if (!m_ZenCacheHost.empty()) + { + Result.CacheHttp = std::make_unique<HttpClient>(m_ZenCacheHost, + HttpClientSettings{.LogCategory = "httpcacheclient", + .ConnectTimeout = std::chrono::milliseconds{3000}, + .Timeout = std::chrono::milliseconds{30000}, + .AssumeHttp2 = false, + .AllowResume = true, + .RetryCount = 0}); + if (Result.CacheHttp->Get("/health").IsSuccess()) + { + Result.BuildCacheStorage = + CreateZenBuildStorageCache(*Result.CacheHttp, StorageCacheStats, m_Namespace, m_Bucket, TempPath / "zencache"); + } + else + { + Result.CacheHttp.reset(); + } + } + return Result; + }; + try { if (SubOption == &m_ListOptions) { - ParseStorageOptions(); - ParseAuthOptions(); - - HttpClient Http(m_BuildsUrl, ClientSettings); - CbObject QueryObject; if (m_ListQueryPath.empty()) { @@ -7505,7 +7953,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } else { - std::filesystem::path ListQueryPath = StringToPath(m_ListQueryPath); + std::filesystem::path ListQueryPath = std::filesystem::absolute(StringToPath(m_ListQueryPath)).make_preferred(); if (ToLower(ListQueryPath.extension().string()) == ".cbo") { QueryObject = LoadCompactBinaryObject(IoBufferBuilder::MakeFromFile(ListQueryPath)); @@ -7525,28 +7973,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } BuildStorage::Statistics StorageStats; - std::unique_ptr<BuildStorage> Storage; - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE_VERBOSE("Querying builds in cloud endpoint '{}'. SessionId: '{}'. Namespace '{}', Bucket '{}'", - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, std::filesystem::path{}); - } - else if (!m_StoragePath.empty()) - { - std::filesystem::path StoragePath = StringToPath(m_StoragePath); - ZEN_CONSOLE_VERBOSE("Querying builds in folder '{}'.", StoragePath); - Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } + BuildStorageCache::Statistics StorageCacheStats; - CbObject Response = Storage->ListBuilds(QueryObject); + StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderName); + + CbObject Response = Storage.BuildStorage->ListBuilds(QueryObject); ZEN_ASSERT(ValidateCompactBinary(Response.GetView(), CbValidateMode::All) == CbValidateError::None); if (m_ListResultPath.empty()) { @@ -7556,7 +7987,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } else { - std::filesystem::path ListResultPath = StringToPath(m_ListResultPath); + std::filesystem::path ListResultPath = std::filesystem::absolute(StringToPath(m_ListResultPath)).make_preferred(); if (ToLower(ListResultPath.extension().string()) == ".cbo") { MemoryView ResponseView = Response.GetView(); @@ -7575,11 +8006,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_UploadOptions) { - ParseStorageOptions(); - ParseAuthOptions(); - - HttpClient Http(m_BuildsUrl, ClientSettings); - if (m_Path.empty()) { throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_UploadOptions.help())); @@ -7610,7 +8036,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } } - std::filesystem::path Path = StringToPath(m_Path); + std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)); if (m_BuildPartName.empty()) { @@ -7645,48 +8071,20 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw zen::OptionParseException(fmt::format("Invalid build part id\n{}", m_UploadOptions.help())); } + const Oid BuildId = Oid::FromHexString(m_BuildId); + const Oid BuildPartId = Oid::FromHexString(m_BuildPartId); + BuildStorage::Statistics StorageStats; - const Oid BuildId = Oid::FromHexString(m_BuildId); - const Oid BuildPartId = Oid::FromHexString(m_BuildPartId); - std::unique_ptr<BuildStorage> Storage; - std::string StorageName; - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE("Uploading '{}' from '{}' to cloud endpoint '{}'. SessionId: '{}'. Namespace '{}', Bucket '{}', {}BuildId '{}'", - m_BuildPartName, - Path, - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket, - GeneratedBuildId ? "Generated " : "", - BuildId); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName); - StorageName = "Cloud DDC"; - } - else if (!m_StoragePath.empty()) - { - std::filesystem::path StoragePath = StringToPath(m_StoragePath); - ZEN_CONSOLE("Uploading '{}' from '{}' to folder '{}'. {}BuildId '{}'", - m_BuildPartName, - Path, - StoragePath, - GeneratedBuildId ? "Generated " : "", - BuildId); - Storage = CreateFileBuildStorage(StoragePath, StorageStats, m_WriteMetadataAsJson, DefaultLatency, DefaultDelayPerKBSec); - StorageName = fmt::format("Disk {}", StoragePath.stem()); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } + BuildStorageCache::Statistics StorageCacheStats; + + StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, Path / ZenTempFolderName); CbObject MetaData; if (m_CreateBuild) { if (!m_BuildMetadataPath.empty()) { - std::filesystem::path MetadataPath = StringToPath(m_BuildMetadataPath); + std::filesystem::path MetadataPath = std::filesystem::absolute(StringToPath(m_BuildMetadataPath)); IoBuffer MetaDataJson = ReadFile(MetadataPath).Flatten(); std::string_view Json(reinterpret_cast<const char*>(MetaDataJson.GetData()), MetaDataJson.GetSize()); std::string JsonError; @@ -7713,12 +8111,12 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } } - UploadFolder(*Storage, + UploadFolder(Storage, BuildId, BuildPartId, m_BuildPartName, Path, - StringToPath(m_ManifestPath), + std::filesystem::absolute(StringToPath(m_ManifestPath)).make_preferred(), m_BlockReuseMinPercentLimit, m_AllowMultiparts, MetaData, @@ -7735,7 +8133,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) "Requests: {}\n" "Avg Request Time: {}\n" "Avg I/O Time: {}", - StorageName, + Storage.StorageName, NiceBytes(StorageStats.TotalBytesRead.load()), NiceBytes(StorageStats.TotalBytesWritten.load()), StorageStats.TotalRequestCount.load(), @@ -7751,11 +8149,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_DownloadOptions) { - ParseStorageOptions(); - ParseAuthOptions(); - - HttpClient Http(m_BuildsUrl, ClientSettings); - if (m_Path.empty()) { throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help())); @@ -7786,37 +8179,14 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } } - std::filesystem::path Path = StringToPath(m_Path); + std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)).make_preferred(); BuildStorage::Statistics StorageStats; - std::unique_ptr<BuildStorage> Storage; - std::string StorageName; - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE("Downloading '{}' to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", - BuildId, - Path, - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket, - BuildId); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName); - StorageName = "Cloud DDC"; - } - else if (!m_StoragePath.empty()) - { - std::filesystem::path StoragePath = StringToPath(m_StoragePath); - ZEN_CONSOLE("Downloading '{}' to '{}' from folder {}. BuildId '{}'", BuildId, Path, StoragePath, BuildId); - Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); - StorageName = fmt::format("Disk {}", StoragePath.stem()); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } + BuildStorageCache::Statistics StorageCacheStats; + + StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, Path / ZenTempFolderName); - DownloadFolder(*Storage, + DownloadFolder(Storage, BuildId, BuildPartIds, m_BuildPartNames, @@ -7835,7 +8205,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) "Requests: {}\n" "Avg Request Time: {}\n" "Avg I/O Time: {}", - StorageName, + Storage.StorageName, NiceBytes(StorageStats.TotalBytesRead.load()), NiceBytes(StorageStats.TotalBytesWritten.load()), StorageStats.TotalRequestCount.load(), @@ -7859,8 +8229,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { throw zen::OptionParseException(fmt::format("compare-path is required\n{}", m_DownloadOptions.help())); } - std::filesystem::path Path = StringToPath(m_Path); - std::filesystem::path DiffPath = StringToPath(m_DiffPath); + std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)).make_preferred(); + std::filesystem::path DiffPath = std::filesystem::absolute(StringToPath(m_DiffPath)).make_preferred(); DiffFolders(Path, DiffPath, m_OnlyChunked); return AbortFlag ? 11 : 0; } @@ -7872,10 +8242,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help())); } - ParseStorageOptions(); - ParseAuthOptions(); - - HttpClient Http(m_BuildsUrl, ClientSettings); // m_StoragePath = "D:\\buildstorage"; // m_Path = "F:\\Saved\\DownloadedBuilds\\++Fortnite+Main-CL-XXXXXXXX\\WindowsClient"; // std::vector<std::string> BuildIdStrings{"07d3942f0e7f4ca1b13b0587", @@ -7886,34 +8252,12 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) // "07d3964f919d577a321a1fdd", // "07d396a6ce875004e16b9528"}; - std::filesystem::path Path = StringToPath(m_Path); + std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)).make_preferred(); BuildStorage::Statistics StorageStats; - std::unique_ptr<BuildStorage> Storage; - std::string StorageName; - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE("Downloading {} to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}'", - FormatArray<std::string>(m_BuildIds, " "sv), - Path, - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName); - StorageName = "Cloud DDC"; - } - else if (!m_StoragePath.empty()) - { - std::filesystem::path StoragePath = StringToPath(m_StoragePath); - ZEN_CONSOLE("Downloading {}'to '{}' from folder {}", FormatArray<std::string>(m_BuildIds, " "sv), Path, StoragePath); - Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); - StorageName = fmt::format("Disk {}", StoragePath.stem()); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } + BuildStorageCache::Statistics StorageCacheStats; + + StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, Path / ZenTempFolderName); Stopwatch Timer; for (const std::string& BuildIdString : m_BuildIds) @@ -7923,7 +8267,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { throw zen::OptionParseException(fmt::format("invalid build id {}\n{}", BuildIdString, m_DownloadOptions.help())); } - DownloadFolder(*Storage, + DownloadFolder(Storage, BuildId, {}, {}, @@ -7945,36 +8289,29 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_TestOptions) { - ParseStorageOptions(); - ParseAuthOptions(); - - HttpClient Http(m_BuildsUrl, ClientSettings); - if (m_Path.empty()) { throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help())); } - std::filesystem::path Path = StringToPath(m_Path); + std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)).make_preferred(); m_BuildId = Oid::NewOid().ToString(); m_BuildPartName = Path.filename().string(); m_BuildPartId = Oid::NewOid().ToString(); m_CreateBuild = true; - BuildStorage::Statistics StorageStats; - const Oid BuildId = Oid::FromHexString(m_BuildId); - const Oid BuildPartId = Oid::FromHexString(m_BuildPartId); - std::unique_ptr<BuildStorage> Storage; - std::string StorageName; + const Oid BuildId = Oid::FromHexString(m_BuildId); + const Oid BuildPartId = Oid::FromHexString(m_BuildPartId); - std::filesystem::path StoragePath = StringToPath(m_StoragePath); + std::filesystem::path StoragePath = std::filesystem::absolute(StringToPath(m_StoragePath)).make_preferred(); if (m_BuildsUrl.empty() && StoragePath.empty()) { StoragePath = (GetRunningExecutablePath().parent_path() / ".tmpstore").make_preferred(); CreateDirectories(StoragePath); CleanDirectory(StoragePath, {}); + m_StoragePath = StoragePath.generic_string(); } auto _ = MakeGuard([&]() { if (m_BuildsUrl.empty() && StoragePath.empty()) @@ -7983,33 +8320,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } }); - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE("Using '{}' to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", - m_BuildPartName.empty() ? m_BuildPartId : m_BuildPartName, - Path, - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket, - BuildId); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName); - StorageName = "Cloud DDC"; - } - else if (!StoragePath.empty()) - { - ZEN_CONSOLE("Using '{}' to '{}' from folder {}. BuildId '{}'", - m_BuildPartName.empty() ? m_BuildPartId : m_BuildPartName, - Path, - StoragePath, - BuildId); - Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); - StorageName = fmt::format("Disk {}", StoragePath.stem()); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } + BuildStorage::Statistics StorageStats; + BuildStorageCache::Statistics StorageCacheStats; + + StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, Path / ZenTempFolderName); auto MakeMetaData = [](const Oid& BuildId) -> CbObject { CbObjectWriter BuildMetaDataWriter; @@ -8032,7 +8346,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) ZEN_CONSOLE("Upload Build {}, Part {} ({})\n{}", m_BuildId, BuildPartId, m_BuildPartName, SB.ToView()); } - UploadFolder(*Storage, + UploadFolder(Storage, BuildId, BuildPartId, m_BuildPartName, @@ -8052,7 +8366,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) const std::filesystem::path DownloadPath = Path.parent_path() / (m_BuildPartName + "_download"); ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, true, true); + DownloadFolder(Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, true, true); if (AbortFlag) { ZEN_CONSOLE("Download failed."); @@ -8064,7 +8378,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); + DownloadFolder(Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed. (identical target)"); @@ -8115,7 +8429,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SourceSize > 256) { Work.ScheduleWork( - GetMediumWorkerPool(EWorkloadType::Burst), + SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst), [SourceSize, FilePath](std::atomic<bool>&) { if (!AbortFlag) { @@ -8168,7 +8482,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); + DownloadFolder(Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed. (scrambled target)"); @@ -8187,7 +8501,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) ZEN_CONSOLE("\nUpload scrambled Build {}, Part {} ({})\n{}\n", BuildId2, BuildPartId2, m_BuildPartName, SB.ToView()); } - UploadFolder(*Storage, + UploadFolder(Storage, BuildId2, BuildPartId2, m_BuildPartName, @@ -8206,7 +8520,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); + DownloadFolder(Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed."); @@ -8214,7 +8528,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, + DownloadFolder(Storage, BuildId2, {BuildPartId2}, {}, @@ -8230,7 +8544,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, + DownloadFolder(Storage, BuildId2, {BuildPartId2}, {}, @@ -8250,11 +8564,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_FetchBlobOptions) { - ParseStorageOptions(); - ParseAuthOptions(); - - HttpClient Http(m_BuildsUrl, ClientSettings); - if (m_BlobHash.empty()) { throw zen::OptionParseException(fmt::format("Blob hash string is missing\n{}", m_UploadOptions.help())); @@ -8266,44 +8575,18 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw zen::OptionParseException(fmt::format("Blob hash string is invalid\n{}", m_UploadOptions.help())); } - if (m_BuildsUrl.empty() && m_StoragePath.empty()) - { - throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", m_UploadOptions.help())); - } + const Oid BuildId = Oid::FromHexString(m_BuildId); - BuildStorage::Statistics StorageStats; - const Oid BuildId = Oid::FromHexString(m_BuildId); - std::unique_ptr<BuildStorage> Storage; - std::string StorageName; + std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)).make_preferred(); - std::filesystem::path Path = StringToPath(m_Path); + BuildStorage::Statistics StorageStats; + BuildStorageCache::Statistics StorageCacheStats; - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE("Using from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket, - BuildId); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName); - StorageName = "Cloud DDC"; - } - else if (!m_StoragePath.empty()) - { - std::filesystem::path StoragePath = StringToPath(m_StoragePath); - ZEN_CONSOLE("Using folder {}. BuildId '{}'", StoragePath, BuildId); - Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); - StorageName = fmt::format("Disk {}", StoragePath.stem()); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } + StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, Path / ZenTempFolderName); uint64_t CompressedSize; uint64_t DecompressedSize; - ValidateBlob(*Storage, BuildId, BlobHash, CompressedSize, DecompressedSize); + ValidateBlob(*Storage.BuildStorage, BuildId, BlobHash, CompressedSize, DecompressedSize); if (AbortFlag) { return 11; @@ -8317,10 +8600,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_ValidateBuildPartOptions) { - ParseStorageOptions(); - ParseAuthOptions(); - - HttpClient Http(m_BuildsUrl, ClientSettings); + // HttpClient Http(m_BuildsUrl, ClientSettings); if (m_BuildsUrl.empty() && m_StoragePath.empty()) { @@ -8342,39 +8622,18 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw zen::OptionParseException(fmt::format("build-part-id conflicts with build-part-name\n{}", m_DownloadOptions.help())); } + std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)).make_preferred(); + BuildStorage::Statistics StorageStats; - std::unique_ptr<BuildStorage> Storage; - std::string StorageName; + BuildStorageCache::Statistics StorageCacheStats; - std::filesystem::path Path = StringToPath(m_Path); + StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, Path / ZenTempFolderName); - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE("Using from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket, - BuildId); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName); - StorageName = "Cloud DDC"; - } - else if (!m_StoragePath.empty()) - { - std::filesystem::path StoragePath = StringToPath(m_StoragePath); - ZEN_CONSOLE("Using folder {}. BuildId '{}'", StoragePath, BuildId); - Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); - StorageName = fmt::format("Disk {}", StoragePath.stem()); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } Oid BuildPartId = Oid::TryFromHexString(m_BuildPartId); ValidateStatistics ValidateStats; DownloadStatistics DownloadStats; - ValidateBuildPart(*Storage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, DownloadStats); + ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, DownloadStats); return AbortFlag ? 13 : 0; } diff --git a/src/zen/cmds/builds_cmd.h b/src/zen/cmds/builds_cmd.h index 1634975c1..b5af236e1 100644 --- a/src/zen/cmds/builds_cmd.h +++ b/src/zen/cmds/builds_cmd.h @@ -40,6 +40,9 @@ private: std::string m_StoragePath; bool m_WriteMetadataAsJson = false; + // cache + std::string m_ZenCacheHost; + std::string m_BuildId; bool m_CreateBuild = false; std::string m_BuildMetadataPath; diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index 30711a432..fe5232d89 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -45,6 +45,7 @@ namespace detail { TempPayloadFile() : m_FileHandle(nullptr), m_WriteOffset(0) {} ~TempPayloadFile() { + ZEN_TRACE_CPU("TempPayloadFile::Close"); try { if (m_FileHandle) @@ -87,6 +88,7 @@ namespace detail { std::error_code Open(const std::filesystem::path& TempFolderPath) { + ZEN_TRACE_CPU("TempPayloadFile::Open"); ZEN_ASSERT(m_FileHandle == nullptr); std::uint64_t TmpIndex = ((std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()) & 0xffffffffu) << 32) | @@ -131,6 +133,7 @@ namespace detail { std::error_code Write(std::string_view DataString) { + ZEN_TRACE_CPU("TempPayloadFile::Write"); const uint8_t* DataPtr = (const uint8_t*)DataString.data(); size_t DataSize = DataString.size(); if (DataSize >= CacheBufferSize) @@ -165,6 +168,7 @@ namespace detail { IoBuffer DetachToIoBuffer() { + ZEN_TRACE_CPU("TempPayloadFile::DetachToIoBuffer"); if (std::error_code Ec = Flush(); Ec) { ThrowSystemError(Ec.value(), Ec.message()); @@ -180,6 +184,7 @@ namespace detail { IoBuffer BorrowIoBuffer() { + ZEN_TRACE_CPU("TempPayloadFile::BorrowIoBuffer"); if (std::error_code Ec = Flush(); Ec) { ThrowSystemError(Ec.value(), Ec.message()); @@ -193,6 +198,7 @@ namespace detail { uint64_t GetSize() const { return m_WriteOffset; } void ResetWritePos(uint64_t WriteOffset) { + ZEN_TRACE_CPU("TempPayloadFile::ResetWritePos"); Flush(); m_WriteOffset = WriteOffset; } @@ -200,6 +206,7 @@ namespace detail { private: std::error_code Flush() { + ZEN_TRACE_CPU("TempPayloadFile::Flush"); if (m_CacheBufferOffset == 0) { return {}; @@ -211,6 +218,7 @@ namespace detail { std::error_code AppendData(const void* Data, uint64_t Size) { + ZEN_TRACE_CPU("TempPayloadFile::AppendData"); ZEN_ASSERT(m_FileHandle != nullptr); const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024; @@ -314,7 +322,11 @@ CommonResponse(std::string_view SessionId, cpr::Response&& HttpResponse, IoBuffe const HttpResponseCode WorkResponseCode = HttpResponseCode(HttpResponse.status_code); if (HttpResponse.error) { - ZEN_WARN("HttpClient client error (session: {}): {}", SessionId, HttpResponse); + if (HttpResponse.error.code != cpr::ErrorCode::OPERATION_TIMEDOUT && + HttpResponse.error.code != cpr::ErrorCode::CONNECTION_FAILURE && HttpResponse.error.code != cpr::ErrorCode::REQUEST_CANCELLED) + { + ZEN_WARN("HttpClient client error (session: {}): {}", SessionId, HttpResponse); + } // Client side failure code return HttpClient::Response{ @@ -376,6 +388,7 @@ ShouldRetry(const cpr::Response& Response) static bool ValidatePayload(cpr::Response& Response, std::unique_ptr<detail::TempPayloadFile>& PayloadFile) { + ZEN_TRACE_CPU("ValidatePayload"); IoBuffer ResponseBuffer = (Response.text.empty() && PayloadFile) ? PayloadFile->BorrowIoBuffer() : IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size()); @@ -535,12 +548,14 @@ struct HttpClient::Impl : public RefCounted inline cpr::Session* operator->() const { return CprSession; } inline cpr::Response Get() { + ZEN_TRACE_CPU("HttpClient::Impl::Get"); cpr::Response Result = CprSession->Get(); ZEN_TRACE("GET {}", Result); return Result; } inline cpr::Response Download(cpr::WriteCallback&& Write, std::optional<cpr::HeaderCallback>&& Header = {}) { + ZEN_TRACE_CPU("HttpClient::Impl::Download"); if (Header) { CprSession->SetHeaderCallback(std::move(Header.value())); @@ -553,12 +568,14 @@ struct HttpClient::Impl : public RefCounted } inline cpr::Response Head() { + ZEN_TRACE_CPU("HttpClient::Impl::Head"); cpr::Response Result = CprSession->Head(); ZEN_TRACE("HEAD {}", Result); return Result; } inline cpr::Response Put(std::optional<cpr::ReadCallback>&& Read = {}) { + ZEN_TRACE_CPU("HttpClient::Impl::Put"); if (Read) { CprSession->SetReadCallback(std::move(Read.value())); @@ -570,6 +587,7 @@ struct HttpClient::Impl : public RefCounted } inline cpr::Response Post(std::optional<cpr::ReadCallback>&& Read = {}) { + ZEN_TRACE_CPU("HttpClient::Impl::Post"); if (Read) { CprSession->SetReadCallback(std::move(Read.value())); @@ -581,6 +599,7 @@ struct HttpClient::Impl : public RefCounted } inline cpr::Response Delete() { + ZEN_TRACE_CPU("HttpClient::Impl::Delete"); cpr::Response Result = CprSession->Delete(); ZEN_TRACE("DELETE {}", Result); return Result; @@ -620,6 +639,7 @@ HttpClient::Impl::Impl(LoggerRef Log) : m_Log(Log) HttpClient::Impl::~Impl() { + ZEN_TRACE_CPU("HttpClient::Impl::~Impl"); m_SessionLock.WithExclusiveLock([&] { for (auto CprSession : m_Sessions) { @@ -638,6 +658,7 @@ HttpClient::Impl::AllocSession(const std::string_view BaseUrl, const std::string_view SessionId, std::optional<HttpClientAccessToken> AccessToken) { + ZEN_TRACE_CPU("HttpClient::Impl::AllocSession"); cpr::Session* CprSession = nullptr; m_SessionLock.WithExclusiveLock([&] { if (!m_Sessions.empty()) @@ -694,6 +715,7 @@ HttpClient::Impl::AllocSession(const std::string_view BaseUrl, void HttpClient::Impl::ReleaseSession(cpr::Session* CprSession) { + ZEN_TRACE_CPU("HttpClient::Impl::ReleaseSession"); CprSession->SetUrl({}); CprSession->SetHeader({}); CprSession->SetBody({}); @@ -718,6 +740,7 @@ HttpClient::~HttpClient() bool HttpClient::Authenticate() { + ZEN_TRACE_CPU("HttpClient::Authenticate"); std::optional<HttpClientAccessToken> Token = GetAccessToken(); if (!Token) { @@ -729,6 +752,7 @@ HttpClient::Authenticate() const std::optional<HttpClientAccessToken> HttpClient::GetAccessToken() { + ZEN_TRACE_CPU("HttpClient::GetAccessToken"); if (!m_ConnectionSettings.AccessTokenProvider.has_value()) { return {}; diff --git a/src/zenhttp/include/zenhttp/formatters.h b/src/zenhttp/include/zenhttp/formatters.h index 74da9ab05..05a23d675 100644 --- a/src/zenhttp/include/zenhttp/formatters.h +++ b/src/zenhttp/include/zenhttp/formatters.h @@ -73,9 +73,11 @@ struct fmt::formatter<cpr::Response> if (zen::IsHttpSuccessCode(Response.status_code)) { return fmt::format_to(Ctx.out(), - "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}", + "Url: {}, Status: {}, Error: '{}' ({}), Bytes: {}/{} (Up/Down), Elapsed: {}", Response.url.str(), Response.status_code, + Response.error.message, + int(Response.error.code), Response.uploaded_bytes, Response.downloaded_bytes, NiceResponseTime.c_str()); @@ -92,29 +94,35 @@ struct fmt::formatter<cpr::Response> zen::ExtendableStringBuilder<256> Sb; std::string_view Json = Obj.ToJson(Sb).ToView(); - return fmt::format_to(Ctx.out(), - "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}, Response: '{}', Reason: '{}'", - Response.url.str(), - Response.status_code, - Response.uploaded_bytes, - Response.downloaded_bytes, - NiceResponseTime.c_str(), - Json, - Response.reason); + return fmt::format_to( + Ctx.out(), + "Url: {}, Status: {}, Error: '{}' ({}). Bytes: {}/{} (Up/Down), Elapsed: {}, Response: '{}', Reason: '{}'", + Response.url.str(), + Response.status_code, + Response.error.message, + int(Response.error.code), + Response.uploaded_bytes, + Response.downloaded_bytes, + NiceResponseTime.c_str(), + Json, + Response.reason); } else { zen::BodyLogFormatter Body(Response.text); - return fmt::format_to(Ctx.out(), - "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}, Response: '{}', Reason: '{}'", - Response.url.str(), - Response.status_code, - Response.uploaded_bytes, - Response.downloaded_bytes, - NiceResponseTime.c_str(), - Body.GetText(), - Response.reason); + return fmt::format_to( + Ctx.out(), + "Url: {}, Status: {}, Error: '{}' ({}), Bytes: {}/{} (Up/Down), Elapsed: {}, Response: '{}', Reason: '{}'", + Response.url.str(), + Response.status_code, + Response.error.message, + int(Response.error.code), + Response.uploaded_bytes, + Response.downloaded_bytes, + NiceResponseTime.c_str(), + Body.GetText(), + Response.reason); } } } diff --git a/src/zenhttp/packageformat.cpp b/src/zenhttp/packageformat.cpp index ae80851e4..9d423ecbc 100644 --- a/src/zenhttp/packageformat.cpp +++ b/src/zenhttp/packageformat.cpp @@ -279,11 +279,10 @@ FormatPackageMessageInternal(const CbPackage& Data, FormatFlags Flags, void* Tar { IoBuffer ObjIoBuffer = AttachmentObject.GetBuffer().AsIoBuffer(); ZEN_ASSERT(ObjIoBuffer.GetSize() > 0); - ResponseBuffers.emplace_back(std::move(ObjIoBuffer)); - *AttachmentInfo++ = {.PayloadSize = ObjIoBuffer.Size(), .Flags = CbAttachmentEntry::kIsObject, .AttachmentHash = Attachment.GetHash()}; + ResponseBuffers.emplace_back(std::move(ObjIoBuffer)); } else if (const CompositeBuffer& AttachmentBinary = Attachment.AsCompositeBinary()) { @@ -500,30 +499,25 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint { if (Entry.Flags & CbAttachmentEntry::kIsObject) { + CompressedBuffer CompBuf(CompressedBuffer::FromCompressedNoValidate(IoBuffer(AttachmentBuffer))); + if (!CompBuf) + { + // First payload is always a compact binary object + MalformedAttachments.push_back( + std::make_pair(i, + fmt::format("Invalid format, expected compressed buffer for CbObject (size {}) for {}", + AttachmentBuffer.GetSize(), + Entry.AttachmentHash))); + } + CbObject AttachmentObject = LoadCompactBinaryObject(std::move(CompBuf)); if (i == 0) { - CompressedBuffer CompBuf(CompressedBuffer::FromCompressedNoValidate(IoBuffer(AttachmentBuffer))); - if (CompBuf) - { - Package.SetObject(LoadCompactBinaryObject(std::move(CompBuf))); - } - else - { - // First payload is always a compact binary object - MalformedAttachments.push_back( - std::make_pair(i, - fmt::format("Invalid format, expected compressed buffer for CbObject (size {}) for {}", - AttachmentBuffer.GetSize(), - Entry.AttachmentHash))); - } + // First payload is always a compact binary object + Package.SetObject(AttachmentObject); } else { - MalformedAttachments.push_back(std::make_pair( - i, - fmt::format("Invalid format, compressed object attachments are not currently supported (size {}) for {}", - AttachmentBuffer.GetSize(), - Entry.AttachmentHash))); + Attachments.emplace_back(CbAttachment(AttachmentObject, Entry.AttachmentHash)); } } else @@ -547,17 +541,14 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint { if (Entry.Flags & CbAttachmentEntry::kIsObject) { + CbObject AttachmentObject = LoadCompactBinaryObject(AttachmentBuffer); if (i == 0) { - Package.SetObject(LoadCompactBinaryObject(AttachmentBuffer)); + Package.SetObject(AttachmentObject); } else { - MalformedAttachments.push_back( - std::make_pair(i, - fmt::format("Invalid format, object attachments are not currently supported (size {}) for {}", - AttachmentBuffer.GetSize(), - Entry.AttachmentHash))); + Attachments.emplace_back(CbAttachment(AttachmentObject, Entry.AttachmentHash)); } } else if (AttachmentSize > 0) diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp index 2888f5450..0da6e31ad 100644 --- a/src/zenserver/admin/admin.cpp +++ b/src/zenserver/admin/admin.cpp @@ -20,6 +20,7 @@ #include <zenstore/cidstore.h> #include <zenstore/gc.h> +#include <zenstore/buildstore/buildstore.h> #include <zenstore/cache/structuredcachestore.h> #include <zenutil/workerpools.h> #include "config.h" @@ -105,6 +106,7 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, ZenCacheStore* CacheStore, CidStore* CidStore, ProjectStore* ProjectStore, + BuildStore* BuildStore, const LogPaths& LogPaths, const ZenServerOptions& ServerOptions) : m_GcScheduler(Scheduler) @@ -112,6 +114,7 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, , m_CacheStore(CacheStore) , m_CidStore(CidStore) , m_ProjectStore(ProjectStore) +, m_BuildStore(BuildStore) , m_LogPaths(LogPaths) , m_ServerOptions(ServerOptions) { @@ -306,6 +309,7 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, Response << "Interval" << ToTimeSpan(State.Config.Interval); Response << "MaxCacheDuration" << ToTimeSpan(State.Config.MaxCacheDuration); Response << "MaxProjectStoreDuration" << ToTimeSpan(State.Config.MaxProjectStoreDuration); + Response << "MaxBuildStoreDuration" << ToTimeSpan(State.Config.MaxBuildStoreDuration); Response << "CollectSmallObjects" << State.Config.CollectSmallObjects; Response << "Enabled" << State.Config.Enabled; Response << "DiskReserveSize" << NiceBytes(State.Config.DiskReserveSize); @@ -401,6 +405,14 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, } } + if (auto Param = Params.GetValue("maxbuildstoreduration"); Param.empty() == false) + { + if (auto Value = ParseInt<uint64_t>(Param)) + { + GcParams.MaxBuildStoreDuration = std::chrono::seconds(Value.value()); + } + } + if (auto Param = Params.GetValue("disksizesoftlimit"); Param.empty() == false) { if (auto Value = ParseInt<uint64_t>(Param)) @@ -782,6 +794,10 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, { m_ProjectStore->Flush(); } + if (m_BuildStore) + { + m_BuildStore->Flush(); + } HttpReq.WriteResponse(HttpResponseCode::OK); }, HttpVerb::kPost); diff --git a/src/zenserver/admin/admin.h b/src/zenserver/admin/admin.h index 563c4f536..e7821dead 100644 --- a/src/zenserver/admin/admin.h +++ b/src/zenserver/admin/admin.h @@ -12,6 +12,7 @@ class JobQueue; class ZenCacheStore; class CidStore; class ProjectStore; +class BuildStore; struct ZenServerOptions; class HttpAdminService : public zen::HttpService @@ -28,6 +29,7 @@ public: ZenCacheStore* CacheStore, CidStore* CidStore, ProjectStore* ProjectStore, + BuildStore* BuildStore, const LogPaths& LogPaths, const ZenServerOptions& ServerOptions); ~HttpAdminService(); @@ -42,6 +44,7 @@ private: ZenCacheStore* m_CacheStore; CidStore* m_CidStore; ProjectStore* m_ProjectStore; + BuildStore* m_BuildStore; LogPaths m_LogPaths; const ZenServerOptions& m_ServerOptions; }; diff --git a/src/zenserver/buildstore/httpbuildstore.cpp b/src/zenserver/buildstore/httpbuildstore.cpp new file mode 100644 index 000000000..06bfea423 --- /dev/null +++ b/src/zenserver/buildstore/httpbuildstore.cpp @@ -0,0 +1,526 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "httpbuildstore.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/compactbinaryvalue.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/trace.h> +#include <zenhttp/packageformat.h> +#include <zenstore/buildstore/buildstore.h> +#include <zenutil/workerpools.h> + +#include <numeric> + +namespace zen { +using namespace std::literals; + +ZEN_DEFINE_LOG_CATEGORY_STATIC(LogBuilds, "builds"sv); + +HttpBuildStoreService::HttpBuildStoreService(HttpStatsService& StatsService, BuildStore& Store) +: m_Log(logging::Get("builds")) +, m_StatsService(StatsService) +, m_BuildStore(Store) +{ + Initialize(); +} + +HttpBuildStoreService::~HttpBuildStoreService() +{ +} + +const char* +HttpBuildStoreService::BaseUri() const +{ + return "/builds/"; +} + +void +HttpBuildStoreService::Initialize() +{ + ZEN_LOG_INFO(LogBuilds, "Initializing Builds Service"); + + m_StatsService.RegisterHandler("builds", *this); + + m_Router.AddPattern("namespace", "([[:alnum:]-_.]+)"); + m_Router.AddPattern("bucket", "([[:alnum:]-_.]+)"); + m_Router.AddPattern("buildid", "([[:xdigit:]]{24})"); + m_Router.AddPattern("hash", "([[:xdigit:]]{40})"); + + m_Router.RegisterRoute( + "{namespace}/{bucket}/{buildid}/blobs/{hash}", + [this](HttpRouterRequest& Req) { PutBlobRequest(Req); }, + HttpVerb::kPut); + + m_Router.RegisterRoute( + "{namespace}/{bucket}/{buildid}/blobs/{hash}", + [this](HttpRouterRequest& Req) { GetBlobRequest(Req); }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "{namespace}/{bucket}/{buildid}/blobs/putBlobMetadata", + [this](HttpRouterRequest& Req) { PutMetadataRequest(Req); }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "{namespace}/{bucket}/{buildid}/blobs/getBlobMetadata", + [this](HttpRouterRequest& Req) { GetMetadatasRequest(Req); }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "{namespace}/{bucket}/{buildid}/blobs/exists", + [this](HttpRouterRequest& Req) { BlobsExistsRequest(Req); }, + HttpVerb::kPost); +} + +void +HttpBuildStoreService::HandleRequest(zen::HttpServerRequest& Request) +{ + ZEN_TRACE_CPU("HttpBuildStoreService::HandleRequest"); + metrics::OperationTiming::Scope $(m_HttpRequests); + + m_BuildStoreStats.RequestCount++; + if (m_Router.HandleRequest(Request) == false) + { + ZEN_LOG_WARN(LogBuilds, "No route found for {0}", Request.RelativeUri()); + return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Not found"sv); + } +} + +void +HttpBuildStoreService::PutBlobRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("HttpBuildStoreService::PutBlobRequest"); + HttpServerRequest& ServerRequest = Req.ServerRequest(); + const std::string_view Namespace = Req.GetCapture(1); + const std::string_view Bucket = Req.GetCapture(2); + const std::string_view BuildId = Req.GetCapture(3); + const std::string_view Hash = Req.GetCapture(4); + ZEN_UNUSED(Namespace, Bucket, BuildId); + IoHash BlobHash; + if (!IoHash::TryParse(Hash, BlobHash)) + { + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid blob hash '{}'", Hash)); + } + m_BuildStoreStats.BlobWriteCount++; + IoBuffer Payload = ServerRequest.ReadPayload(); + if (!Payload) + { + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Payload blob {} is empty", Hash)); + } + if (Payload.GetContentType() != HttpContentType::kCompressedBinary) + { + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse( + HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Payload blob {} content type {} is invalid", Hash, ToString(Payload.GetContentType()))); + } + m_BuildStore.PutBlob(BlobHash, ServerRequest.ReadPayload()); + // ZEN_INFO("Stored blob {}. Size: {}", BlobHash, ServerRequest.ReadPayload().GetSize()); + return ServerRequest.WriteResponse(HttpResponseCode::OK); +} + +void +HttpBuildStoreService::GetBlobRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("HttpBuildStoreService::GetBlobRequest"); + HttpServerRequest& ServerRequest = Req.ServerRequest(); + std::string_view Namespace = Req.GetCapture(1); + std::string_view Bucket = Req.GetCapture(2); + std::string_view BuildId = Req.GetCapture(3); + std::string_view Hash = Req.GetCapture(4); + ZEN_UNUSED(Namespace, Bucket, BuildId); + IoHash BlobHash; + if (!IoHash::TryParse(Hash, BlobHash)) + { + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid blob hash '{}'", Hash)); + } + zen::HttpRanges Ranges; + bool HasRange = ServerRequest.TryGetRanges(Ranges); + if (Ranges.size() > 1) + { + // Only a single range is supported + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Multiple ranges in blob request is not supported"); + } + + m_BuildStoreStats.BlobReadCount++; + IoBuffer Blob = m_BuildStore.GetBlob(BlobHash); + if (!Blob) + { + return ServerRequest.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Blob with hash '{}' could not be found", Hash)); + } + // ZEN_INFO("Fetched blob {}. Size: {}", BlobHash, Blob.GetSize()); + m_BuildStoreStats.BlobHitCount++; + if (HasRange) + { + const HttpRange& Range = Ranges.front(); + const uint64_t BlobSize = Blob.GetSize(); + const uint64_t MaxBlobSize = Range.Start < BlobSize ? Range.Start - BlobSize : 0; + const uint64_t RangeSize = Min(Range.End - Range.Start + 1, MaxBlobSize); + if (Range.Start + RangeSize >= BlobSize) + { + return ServerRequest.WriteResponse(HttpResponseCode::NoContent); + } + Blob = IoBuffer(Blob, Range.Start, RangeSize); + return ServerRequest.WriteResponse(HttpResponseCode::OK, ZenContentType::kBinary, Blob); + } + else + { + return ServerRequest.WriteResponse(HttpResponseCode::OK, Blob.GetContentType(), Blob); + } +} + +void +HttpBuildStoreService::PutMetadataRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("HttpBuildStoreService::PutMetadataRequest"); + HttpServerRequest& ServerRequest = Req.ServerRequest(); + std::string_view Namespace = Req.GetCapture(1); + std::string_view Bucket = Req.GetCapture(2); + std::string_view BuildId = Req.GetCapture(3); + + IoBuffer MetaPayload = ServerRequest.ReadPayload(); + if (MetaPayload.GetContentType() != ZenContentType::kCbPackage) + { + throw std::runtime_error(fmt::format("PutMetadataRequest payload has unexpected payload type '{}', expected '{}'", + ToString(MetaPayload.GetContentType()), + ToString(ZenContentType::kCbPackage))); + } + CbPackage Message = ParsePackageMessage(MetaPayload); + + CbObjectView MessageObject = Message.GetObject(); + if (!MessageObject) + { + throw std::runtime_error("PutMetadataRequest payload object is missing"); + } + CbArrayView BlobsArray = MessageObject["blobHashes"sv].AsArrayView(); + CbArrayView MetadataArray = MessageObject["metadatas"sv].AsArrayView(); + + const uint64_t BlobCount = BlobsArray.Num(); + if (BlobCount == 0) + { + throw std::runtime_error("PutMetadataRequest blobs array is empty"); + } + if (BlobCount != MetadataArray.Num()) + { + throw std::runtime_error( + fmt::format("PutMetadataRequest metadata array size {} does not match blobs array size {}", MetadataArray.Num(), BlobCount)); + } + + std::vector<IoHash> BlobHashes; + std::vector<IoBuffer> MetadataPayloads; + + BlobHashes.reserve(BlobCount); + MetadataPayloads.reserve(BlobCount); + + auto BlobsArrayIt = begin(BlobsArray); + auto MetadataArrayIt = begin(MetadataArray); + while (BlobsArrayIt != end(BlobsArray)) + { + const IoHash BlobHash = (*BlobsArrayIt).AsHash(); + const IoHash MetadataHash = (*MetadataArrayIt).AsAttachment(); + + const CbAttachment* Attachment = Message.FindAttachment(MetadataHash); + if (Attachment == nullptr) + { + throw std::runtime_error(fmt::format("Blob metadata attachment {} is missing", MetadataHash)); + } + BlobHashes.push_back(BlobHash); + if (Attachment->IsObject()) + { + MetadataPayloads.push_back(Attachment->AsObject().GetBuffer().MakeOwned().AsIoBuffer()); + MetadataPayloads.back().SetContentType(ZenContentType::kCbObject); + } + else if (Attachment->IsCompressedBinary()) + { + MetadataPayloads.push_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer()); + MetadataPayloads.back().SetContentType(ZenContentType::kCompressedBinary); + } + else + { + ZEN_ASSERT(Attachment->IsBinary()); + MetadataPayloads.push_back(Attachment->AsBinary().AsIoBuffer()); + MetadataPayloads.back().SetContentType(ZenContentType::kBinary); + } + + BlobsArrayIt++; + MetadataArrayIt++; + } + m_BuildStore.PutMetadatas(BlobHashes, MetadataPayloads); + return ServerRequest.WriteResponse(HttpResponseCode::OK); +} + +void +HttpBuildStoreService::GetMetadatasRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("HttpBuildStoreService::GetMetadatasRequest"); + HttpServerRequest& ServerRequest = Req.ServerRequest(); + std::string_view Namespace = Req.GetCapture(1); + std::string_view Bucket = Req.GetCapture(2); + std::string_view BuildId = Req.GetCapture(3); + ZEN_UNUSED(Namespace, Bucket, BuildId); + IoBuffer RequestPayload = ServerRequest.ReadPayload(); + if (!RequestPayload) + { + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Expected compact binary body for metadata request, body is missing"); + } + if (RequestPayload.GetContentType() != HttpContentType::kCbObject) + { + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse( + HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Expected compact binary body for metadata request, got {}", ToString(RequestPayload.GetContentType()))); + } + if (CbValidateError ValidateError = ValidateCompactBinary(RequestPayload.GetView(), CbValidateMode::Default); + ValidateError != CbValidateError::None) + { + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse( + HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Compact binary body for metadata request is not valid, reason: {}", ToString(ValidateError))); + } + CbObject RequestObject = LoadCompactBinaryObject(RequestPayload); + CbArrayView BlobsArray = RequestObject["blobHashes"sv].AsArrayView(); + if (!BlobsArray) + { + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Compact binary body for metadata request is missing 'blobHashes' array"); + } + const uint64_t BlobCount = BlobsArray.Num(); + + std::vector<IoHash> BlobRawHashes; + BlobRawHashes.reserve(BlobCount); + for (CbFieldView BlockHashView : BlobsArray) + { + BlobRawHashes.push_back(BlockHashView.AsHash()); + if (BlobRawHashes.back() == IoHash::Zero) + { + const uint8_t Type = (uint8_t)BlockHashView.GetValue().GetType(); + return ServerRequest.WriteResponse( + HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Compact binary body for metadata 'blobHashes' array contains invalid field type: {}", Type)); + } + } + m_BuildStoreStats.BlobMetaReadCount += BlobRawHashes.size(); + std::vector<IoBuffer> BlockMetadatas = m_BuildStore.GetMetadatas(BlobRawHashes, &GetSmallWorkerPool(EWorkloadType::Burst)); + + CbPackage ResponsePackage; + std::vector<CbAttachment> Attachments; + tsl::robin_set<IoHash, IoHash::Hasher> AttachmentHashes; + Attachments.reserve(BlobCount); + AttachmentHashes.reserve(BlobCount); + { + CbObjectWriter ResponseWriter; + + ResponseWriter.BeginArray("blobHashes"); + for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++) + { + if (BlockMetadatas[BlockHashIndex]) + { + const IoHash& BlockHash = BlobRawHashes[BlockHashIndex]; + ResponseWriter.AddHash(BlockHash); + } + } + ResponseWriter.EndArray(); // blobHashes + + ResponseWriter.BeginArray("metadatas"); + + for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++) + { + if (IoBuffer Metadata = BlockMetadatas[BlockHashIndex]; Metadata) + { + switch (Metadata.GetContentType()) + { + case ZenContentType::kCbObject: + { + CbObject Object = CbObject(SharedBuffer(std::move(Metadata)).MakeOwned()); + const IoHash ObjectHash = Object.GetHash(); + ResponseWriter.AddBinaryAttachment(ObjectHash); + if (!AttachmentHashes.contains(ObjectHash)) + { + Attachments.push_back(CbAttachment(Object, ObjectHash)); + AttachmentHashes.insert(ObjectHash); + } + } + break; + case ZenContentType::kCompressedBinary: + { + IoHash RawHash; + uint64_t _; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Metadata)), RawHash, _); + ResponseWriter.AddBinaryAttachment(RawHash); + if (!AttachmentHashes.contains(RawHash)) + { + Attachments.push_back(CbAttachment(Compressed, RawHash)); + AttachmentHashes.insert(RawHash); + } + } + break; + default: + { + const IoHash RawHash = IoHash::HashBuffer(Metadata); + ResponseWriter.AddBinaryAttachment(RawHash); + if (!AttachmentHashes.contains(RawHash)) + { + Attachments.push_back(CbAttachment(SharedBuffer(Metadata), RawHash)); + AttachmentHashes.insert(RawHash); + } + } + break; + } + } + } + + ResponseWriter.EndArray(); // metadatas + + ResponsePackage.SetObject(ResponseWriter.Save()); + } + ResponsePackage.AddAttachments(Attachments); + + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage); + ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); +} + +void +HttpBuildStoreService::BlobsExistsRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("HttpBuildStoreService::BlobsExistsRequest"); + HttpServerRequest& ServerRequest = Req.ServerRequest(); + std::string_view Namespace = Req.GetCapture(1); + std::string_view Bucket = Req.GetCapture(2); + std::string_view BuildId = Req.GetCapture(3); + ZEN_UNUSED(Namespace, Bucket, BuildId); + IoBuffer RequestPayload = ServerRequest.ReadPayload(); + if (!RequestPayload) + { + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Expected compact binary body for blob exists request, body is missing"); + } + if (RequestPayload.GetContentType() != HttpContentType::kCbObject) + { + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse( + HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Expected compact binary body for blob exists request, got {}", ToString(RequestPayload.GetContentType()))); + } + if (CbValidateError ValidateError = ValidateCompactBinary(RequestPayload.GetView(), CbValidateMode::Default); + ValidateError != CbValidateError::None) + { + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse( + HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Compact binary body for blob exists request is not valid, reason: {}", ToString(ValidateError))); + } + CbObject RequestObject = LoadCompactBinaryObject(RequestPayload); + CbArrayView BlobsArray = RequestObject["blobHashes"sv].AsArrayView(); + if (!BlobsArray) + { + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Compact binary body for blob exists request is missing 'blobHashes' array"); + } + + std::vector<IoHash> BlobRawHashes; + BlobRawHashes.reserve(BlobsArray.Num()); + for (CbFieldView BlockHashView : BlobsArray) + { + BlobRawHashes.push_back(BlockHashView.AsHash()); + if (BlobRawHashes.back() == IoHash::Zero) + { + const uint8_t Type = (uint8_t)BlockHashView.GetValue().GetType(); + return ServerRequest.WriteResponse( + HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Compact binary body for blob exists request 'blobHashes' array contains invalid field type: {}", Type)); + } + } + + m_BuildStoreStats.BlobExistsCount += BlobRawHashes.size(); + std::vector<BuildStore::BlobExistsResult> BlobsExists = m_BuildStore.BlobsExists(BlobRawHashes); + CbObjectWriter ResponseWriter(9 * BlobsExists.size()); + ResponseWriter.BeginArray("blobExists"sv); + for (const BuildStore::BlobExistsResult& BlobExists : BlobsExists) + { + ResponseWriter.AddBool(BlobExists.HasBody); + if (BlobExists.HasBody) + { + m_BuildStoreStats.BlobExistsBodyHitCount++; + } + } + ResponseWriter.EndArray(); // blobExist + ResponseWriter.BeginArray("metadataExists"sv); + for (const BuildStore::BlobExistsResult& BlobExists : BlobsExists) + { + ResponseWriter.AddBool(BlobExists.HasBody); + if (BlobExists.HasMetadata) + { + m_BuildStoreStats.BlobExistsMetaHitCount++; + } + } + ResponseWriter.EndArray(); // metadataExists + CbObject ResponseObject = ResponseWriter.Save(); + return ServerRequest.WriteResponse(HttpResponseCode::OK, ResponseObject); +} + +void +HttpBuildStoreService::HandleStatsRequest(HttpServerRequest& Request) +{ + ZEN_TRACE_CPU("HttpBuildStoreService::Stats"); + CbObjectWriter Cbo; + + EmitSnapshot("requests", m_HttpRequests, Cbo); + + Cbo.BeginObject("builds"); + { + Cbo.BeginObject("blobs"); + { + Cbo << "readcount" << m_BuildStoreStats.BlobReadCount << "writecount" << m_BuildStoreStats.BlobWriteCount << "hitcount" + << m_BuildStoreStats.BlobHitCount; + } + Cbo.EndObject(); + + Cbo.BeginObject("metadata"); + { + Cbo << "readcount" << m_BuildStoreStats.BlobMetaReadCount << "writecount" << m_BuildStoreStats.BlobMetaWriteCount << "hitcount" + << m_BuildStoreStats.BlobMetaHitCount; + } + Cbo.EndObject(); + + Cbo << "requestcount" << m_BuildStoreStats.RequestCount; + Cbo << "badrequestcount" << m_BuildStoreStats.BadRequestCount; + } + Cbo.EndObject(); + + return Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +} // namespace zen diff --git a/src/zenserver/buildstore/httpbuildstore.h b/src/zenserver/buildstore/httpbuildstore.h new file mode 100644 index 000000000..a59aa882a --- /dev/null +++ b/src/zenserver/buildstore/httpbuildstore.h @@ -0,0 +1,65 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/stats.h> +#include <zenhttp/httpserver.h> +#include <zenhttp/httpstats.h> + +#include <filesystem> + +namespace zen { + +class BuildStore; + +class HttpBuildStoreService final : public zen::HttpService, public IHttpStatsProvider +{ +public: + HttpBuildStoreService(HttpStatsService& StatsService, BuildStore& Store); + virtual ~HttpBuildStoreService(); + + virtual const char* BaseUri() const override; + virtual void HandleRequest(zen::HttpServerRequest& Request) override; + + virtual void HandleStatsRequest(HttpServerRequest& Request) override; + +private: + struct BuildStoreStats + { + std::atomic_uint64_t BlobReadCount{}; + std::atomic_uint64_t BlobHitCount{}; + std::atomic_uint64_t BlobWriteCount{}; + std::atomic_uint64_t BlobMetaReadCount{}; + std::atomic_uint64_t BlobMetaHitCount{}; + std::atomic_uint64_t BlobMetaWriteCount{}; + std::atomic_uint64_t BlobExistsCount{}; + std::atomic_uint64_t BlobExistsBodyHitCount{}; + std::atomic_uint64_t BlobExistsMetaHitCount{}; + std::atomic_uint64_t RequestCount{}; + std::atomic_uint64_t BadRequestCount{}; + }; + + void Initialize(); + + inline LoggerRef Log() { return m_Log; } + + LoggerRef m_Log; + + void PutBlobRequest(HttpRouterRequest& Req); + void GetBlobRequest(HttpRouterRequest& Req); + + void PutMetadataRequest(HttpRouterRequest& Req); + void GetMetadatasRequest(HttpRouterRequest& Req); + + void BlobsExistsRequest(HttpRouterRequest& Req); + + HttpRequestRouter m_Router; + + HttpStatsService& m_StatsService; + + BuildStore& m_BuildStore; + BuildStoreStats m_BuildStoreStats; + metrics::OperationTiming m_HttpRequests; +}; + +} // namespace zen diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp index 809092378..0da98210c 100644 --- a/src/zenserver/config.cpp +++ b/src/zenserver/config.cpp @@ -377,6 +377,9 @@ ParseConfigFile(const std::filesystem::path& Path, LuaOptions.AddOption("server.objectstore.enabled"sv, ServerOptions.ObjectStoreEnabled, "objectstore-enabled"sv); LuaOptions.AddOption("server.objectstore.buckets"sv, ServerOptions.ObjectStoreConfig); + ////// buildsstore + LuaOptions.AddOption("server.buildstore.enabled"sv, ServerOptions.BuildStoreConfig.Enabled, "buildstore-enabled"sv); + ////// network LuaOptions.AddOption("network.httpserverclass"sv, ServerOptions.HttpServerConfig.ServerClass, "http"sv); LuaOptions.AddOption("network.httpserverthreads"sv, ServerOptions.HttpServerConfig.ThreadCount, "http-threads"sv); @@ -1031,6 +1034,13 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) cxxopts::value<std::vector<std::string>>(BucketConfigs), ""); + options.add_option("buildstore", + "", + "buildstore-enabled", + "Whether the builds store is enabled or not.", + cxxopts::value<bool>(ServerOptions.BuildStoreConfig.Enabled)->default_value("false"), + ""); + options.add_option("stats", "", "statsd", diff --git a/src/zenserver/config.h b/src/zenserver/config.h index c7781aada..a87b6f8b3 100644 --- a/src/zenserver/config.h +++ b/src/zenserver/config.h @@ -59,11 +59,17 @@ struct ZenProjectStoreEvictionPolicy int32_t MaxDurationSeconds = 7 * 24 * 60 * 60; }; +struct ZenBuildStoreEvictionPolicy +{ + int32_t MaxDurationSeconds = 3 * 24 * 60 * 60; +}; + struct ZenGcConfig { // ZenCasEvictionPolicy Cas; ZenCacheEvictionPolicy Cache; ZenProjectStoreEvictionPolicy ProjectStore; + ZenBuildStoreEvictionPolicy BuildStore; int32_t MonitorIntervalSeconds = 30; int32_t IntervalSeconds = 0; bool CollectSmallObjects = true; @@ -130,6 +136,11 @@ struct ZenProjectStoreConfig bool StoreProjectAttachmentMetaData = false; }; +struct ZenBuildStoreConfig +{ + bool Enabled = false; +}; + struct ZenWorkspacesConfig { bool Enabled = false; @@ -145,6 +156,7 @@ struct ZenServerOptions zen::HttpServerConfig HttpServerConfig; ZenStructuredCacheConfig StructuredCacheConfig; ZenProjectStoreConfig ProjectStoreConfig; + ZenBuildStoreConfig BuildStoreConfig; ZenStatsConfig StatsConfig; ZenWorkspacesConfig WorksSpacesConfig; std::filesystem::path SystemRootDir; // System root directory (used for machine level config) diff --git a/src/zenserver/workspaces/httpworkspaces.cpp b/src/zenserver/workspaces/httpworkspaces.cpp index 8a4b977ad..0b7fd0400 100644 --- a/src/zenserver/workspaces/httpworkspaces.cpp +++ b/src/zenserver/workspaces/httpworkspaces.cpp @@ -84,7 +84,7 @@ HttpWorkspacesService::HttpWorkspacesService(HttpStatsService& StatsService, con HttpWorkspacesService::~HttpWorkspacesService() { - m_StatsService.UnregisterHandler("prj", *this); + m_StatsService.UnregisterHandler("ws", *this); } const char* diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index f84bc0b00..03e269d49 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -23,6 +23,7 @@ #include <zencore/trace.h> #include <zencore/workthreadpool.h> #include <zenhttp/httpserver.h> +#include <zenstore/buildstore/buildstore.h> #include <zenstore/cidstore.h> #include <zenstore/scrubcontext.h> #include <zenstore/workspaces.h> @@ -262,6 +263,13 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen *m_Workspaces)); } + if (ServerOptions.BuildStoreConfig.Enabled) + { + BuildStoreConfig ObjCfg; + ObjCfg.RootDirectory = m_DataRoot / "builds"; + m_BuildStore = std::make_unique<BuildStore>(std::move(ObjCfg), m_GcManager); + } + if (ServerOptions.StructuredCacheConfig.Enabled) { InitializeStructuredCache(ServerOptions); @@ -310,6 +318,12 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen m_Http->RegisterService(*m_ObjStoreService); } + if (ServerOptions.BuildStoreConfig.Enabled) + { + m_BuildStoreService = std::make_unique<HttpBuildStoreService>(m_StatsService, *m_BuildStore); + m_Http->RegisterService(*m_BuildStoreService); + } + #if ZEN_WITH_VFS m_VfsService = std::make_unique<VfsService>(); m_VfsService->AddService(Ref<ProjectStore>(m_ProjectStore)); @@ -327,6 +341,7 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds), .MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds), .MaxProjectStoreDuration = std::chrono::seconds(ServerOptions.GcConfig.ProjectStore.MaxDurationSeconds), + .MaxBuildStoreDuration = std::chrono::seconds(ServerOptions.GcConfig.BuildStore.MaxDurationSeconds), .CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects, .Enabled = ServerOptions.GcConfig.Enabled, .DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize, @@ -347,6 +362,7 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen m_CacheStore.Get(), m_CidStore.get(), m_ProjectStore, + m_BuildStore.get(), HttpAdminService::LogPaths{.AbsLogPath = ServerOptions.AbsLogFile, .HttpLogPath = ServerOptions.DataDir / "logs" / "http.log", .CacheLogPath = ServerOptions.DataDir / "logs" / "z$.log"}, @@ -801,6 +817,9 @@ ZenServer::Cleanup() m_ObjStoreService.reset(); m_FrontendService.reset(); + m_BuildStoreService.reset(); + m_BuildStore = {}; + m_StructuredCacheService.reset(); m_UpstreamService.reset(); m_UpstreamCache.reset(); diff --git a/src/zenserver/zenserver.h b/src/zenserver/zenserver.h index 80054dc35..5cfa04ba1 100644 --- a/src/zenserver/zenserver.h +++ b/src/zenserver/zenserver.h @@ -25,6 +25,7 @@ ZEN_THIRD_PARTY_INCLUDES_END #include <zenstore/cache/structuredcachestore.h> #include <zenstore/gc.h> #include "admin/admin.h" +#include "buildstore/httpbuildstore.h" #include "cache/httpstructuredcache.h" #include "diag/diagsvcs.h" #include "frontend/frontend.h" @@ -127,6 +128,8 @@ private: Ref<ZenCacheStore> m_CacheStore; std::unique_ptr<OpenProcessCache> m_OpenProcessCache; HttpTestService m_TestService; + std::unique_ptr<BuildStore> m_BuildStore; + #if ZEN_WITH_TESTS HttpTestingService m_TestingService; #endif @@ -140,6 +143,7 @@ private: HttpHealthService m_HealthService; std::unique_ptr<HttpFrontendService> m_FrontendService; std::unique_ptr<HttpObjectStoreService> m_ObjStoreService; + std::unique_ptr<HttpBuildStoreService> m_BuildStoreService; std::unique_ptr<VfsService> m_VfsService; std::unique_ptr<JobQueue> m_JobQueue; std::unique_ptr<HttpAdminService> m_AdminService; diff --git a/src/zenstore-test/zenstore-test.cpp b/src/zenstore-test/zenstore-test.cpp index e5b312984..c56971520 100644 --- a/src/zenstore-test/zenstore-test.cpp +++ b/src/zenstore-test/zenstore-test.cpp @@ -2,6 +2,7 @@ #include <zencore/filesystem.h> #include <zencore/logging.h> +#include <zenstore/buildstore/buildstore.h> #include <zenstore/zenstore.h> #include <zencore/memory/newdelete.h> @@ -18,6 +19,7 @@ main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[]) zen::zenstore_forcelinktests(); zen::logging::InitializeLogging(); + zen::buildstore_forcelink(); zen::MaximizeOpenFileCount(); return ZEN_RUN_TESTS(argc, argv); diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index e976c061d..63c0388fa 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -578,7 +578,7 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, cons } void -BlockStore::WriteChunks(std::span<IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback) +BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback) { ZEN_MEMSCOPE(GetBlocksTag()); ZEN_TRACE_CPU("BlockStore::WriteChunks"); diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp new file mode 100644 index 000000000..8674aab75 --- /dev/null +++ b/src/zenstore/buildstore/buildstore.cpp @@ -0,0 +1,1475 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenstore/buildstore/buildstore.h> + +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/memory/llm.h> +#include <zencore/scopeguard.h> +#include <zencore/trace.h> +#include <zencore/workthreadpool.h> + +#include <zencore/uid.h> +#include <zencore/xxhash.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <gsl/gsl-lite.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +#if ZEN_WITH_TESTS +# include <zencore/compactbinarybuilder.h> +# include <zencore/compress.h> +# include <zencore/testing.h> +# include <zencore/testutils.h> +#endif // ZEN_WITH_TESTS + +namespace zen { +const FLLMTag& +GetBuildstoreTag() +{ + static FLLMTag _("store", FLLMTag("builds")); + + return _; +} + +using namespace std::literals; + +namespace blobstore::impl { + + const std::string BaseName = "builds"; + const char* IndexExtension = ".uidx"; + const char* LogExtension = ".slog"; + + std::filesystem::path GetBlobIndexPath(const std::filesystem::path& RootDirectory) + { + return RootDirectory / (BaseName + IndexExtension); + } + + std::filesystem::path GetBlobLogPath(const std::filesystem::path& RootDirectory) { return RootDirectory / (BaseName + LogExtension); } + + std::filesystem::path GetMetaIndexPath(const std::filesystem::path& RootDirectory) + { + return RootDirectory / (BaseName + "_meta" + IndexExtension); + } + + std::filesystem::path GetMetaLogPath(const std::filesystem::path& RootDirectory) + { + return RootDirectory / (BaseName + "_meta" + LogExtension); + } +} // namespace blobstore::impl + +BuildStore::BuildStore(const BuildStoreConfig& Config, GcManager& Gc) +: m_Config(Config) +, m_Gc(Gc) +, m_LargeBlobStore(m_Gc) +, m_SmallBlobStore(Gc) +, m_MetadataBlockStore() +{ + ZEN_TRACE_CPU("BuildStore::BuildStore"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + try + { + std::filesystem::path BlobLogPath = blobstore::impl::GetBlobLogPath(Config.RootDirectory); + std::filesystem::path MetaLogPath = blobstore::impl::GetMetaLogPath(Config.RootDirectory); + bool IsNew = !(std::filesystem::exists(BlobLogPath) && std::filesystem::exists(MetaLogPath)); + + if (!IsNew) + { + m_BlobLogFlushPosition = ReadPayloadLog(RwLock::ExclusiveLockScope(m_Lock), BlobLogPath, 0); + m_MetaLogFlushPosition = ReadMetadataLog(RwLock::ExclusiveLockScope(m_Lock), MetaLogPath, 0); + } + m_LargeBlobStore.Initialize(Config.RootDirectory / "file_cas", IsNew); + m_SmallBlobStore.Initialize(Config.RootDirectory, + "blob_cas", + m_Config.SmallBlobBlockStoreMaxBlockSize, + m_Config.SmallBlobBlockStoreAlignement, + IsNew); + m_MetadataBlockStore.Initialize(Config.RootDirectory / "metadata", m_Config.MetadataBlockStoreMaxBlockSize, 1u << 20); + { + BlockStore::BlockIndexSet KnownBlocks; + for (const BlobEntry& Blob : m_BlobEntries) + { + if (const MetadataIndex MetaIndex = Blob.Metadata; MetaIndex) + { + const MetadataEntry& Metadata = m_MetadataEntries[MetaIndex]; + KnownBlocks.insert(Metadata.Location.BlockIndex); + } + } + m_MetadataBlockStore.SyncExistingBlocksOnDisk(KnownBlocks); + } + + m_PayloadlogFile.Open(BlobLogPath, CasLogFile::Mode::kWrite); + m_MetadatalogFile.Open(MetaLogPath, CasLogFile::Mode::kWrite); + + m_Gc.AddGcReferencer(*this); + m_Gc.AddGcReferenceLocker(*this); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to initialize build store. Reason: '{}'", Ex.what()); + m_Gc.RemoveGcReferenceLocker(*this); + m_Gc.RemoveGcReferencer(*this); + } +} + +BuildStore::~BuildStore() +{ + try + { + ZEN_TRACE_CPU("BuildStore::~BuildStore"); + m_Gc.RemoveGcReferenceLocker(*this); + m_Gc.RemoveGcReferencer(*this); + Flush(); + m_MetadatalogFile.Close(); + m_PayloadlogFile.Close(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~BuildStore() threw exception: {}", Ex.what()); + } +} + +void +BuildStore::PutBlob(const IoHash& BlobHash, const IoBuffer& Payload) +{ + ZEN_TRACE_CPU("BuildStore::PutBlob"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCompressedBinary); + { + RwLock::SharedLockScope _(m_Lock); + if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) + { + const BlobIndex BlobIndex = It->second; + if (m_BlobEntries[BlobIndex].Payload) + { + return; + } + } + } + + PayloadEntry Entry; + if (Payload.GetSize() > m_Config.SmallBlobBlockStoreMaxBlockEmbedSize) + { + CasStore::InsertResult Result = m_LargeBlobStore.InsertChunk(Payload, BlobHash); + ZEN_UNUSED(Result); + Entry = {.Flags = PayloadEntry::kStandalone}; + } + else + { + CasStore::InsertResult Result = m_SmallBlobStore.InsertChunk(Payload, BlobHash); + ZEN_UNUSED(Result); + Entry = {.Flags = 0}; + } + m_PayloadlogFile.Append(PayloadDiskEntry{.Entry = Entry, .BlobHash = BlobHash}); + + RwLock::ExclusiveLockScope _(m_Lock); + if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) + { + const BlobIndex ExistingBlobIndex = It->second; + BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; + if (Blob.Payload) + { + m_PayloadEntries[Blob.Payload] = Entry; + } + else + { + Blob.Payload = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size())); + m_PayloadEntries.push_back(Entry); + } + Blob.LastAccessTime = GcClock::TickCount(); + } + else + { + PayloadIndex NewPayloadIndex = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size())); + m_PayloadEntries.push_back(Entry); + + const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size())); + // we only remove during GC and compact this then... + m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); + m_BlobLookup.insert({BlobHash, NewBlobIndex}); + } +} + +IoBuffer +BuildStore::GetBlob(const IoHash& BlobHash) +{ + ZEN_TRACE_CPU("BuildStore::GetBlob"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + RwLock::SharedLockScope Lock(m_Lock); + if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) + { + const BlobIndex ExistingBlobIndex = It->second; + BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; + Blob.LastAccessTime = GcClock::TickCount(); + if (Blob.Payload) + { + const PayloadEntry& Entry = m_PayloadEntries[Blob.Payload]; + const bool IsStandalone = (Entry.Flags & PayloadEntry::kStandalone) != 0; + Lock.ReleaseNow(); + + IoBuffer Chunk; + if (IsStandalone) + { + ZEN_TRACE_CPU("GetLarge"); + Chunk = m_LargeBlobStore.FindChunk(BlobHash); + } + else + { + ZEN_TRACE_CPU("GetSmall"); + Chunk = m_SmallBlobStore.FindChunk(BlobHash); + } + if (Chunk) + { + Chunk.SetContentType(ZenContentType::kCompressedBinary); + return Chunk; + } + else + { + ZEN_WARN("Inconsistencies in build store, {} is in index but not {}", BlobHash, IsStandalone ? "on disk" : "in block"); + } + } + } + return {}; +} + +std::vector<BuildStore::BlobExistsResult> +BuildStore::BlobsExists(std::span<const IoHash> BlobHashes) +{ + ZEN_TRACE_CPU("BuildStore::BlobsExists"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + std::vector<BuildStore::BlobExistsResult> Result; + Result.reserve(BlobHashes.size()); + RwLock::SharedLockScope _(m_Lock); + for (const IoHash& BlobHash : BlobHashes) + { + if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) + { + const BlobIndex ExistingBlobIndex = It->second; + BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; + bool HasPayload = !!Blob.Payload; + bool HasMetadata = !!Blob.Metadata; + Result.push_back(BlobExistsResult{.HasBody = HasPayload, .HasMetadata = HasMetadata}); + } + else + { + Result.push_back({}); + } + } + return Result; +} + +void +BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoBuffer> MetaDatas) +{ + ZEN_TRACE_CPU("BuildStore::PutMetadatas"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + size_t WriteBlobIndex = 0; + m_MetadataBlockStore.WriteChunks(MetaDatas, m_Config.MetadataBlockStoreAlignement, [&](std::span<BlockStoreLocation> Locations) { + RwLock::ExclusiveLockScope _(m_Lock); + for (size_t LocationIndex = 0; LocationIndex < Locations.size(); LocationIndex++) + { + const IoBuffer& Data = MetaDatas[WriteBlobIndex]; + const IoHash& BlobHash = BlobHashes[WriteBlobIndex]; + const BlockStoreLocation& Location = Locations[LocationIndex]; + + MetadataEntry Entry = {.Location = Location, .ContentType = Data.GetContentType(), .Flags = 0}; + m_MetadatalogFile.Append(MetadataDiskEntry{.Entry = Entry, .BlobHash = BlobHash}); + + if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) + { + const BlobIndex ExistingBlobIndex = It->second; + BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; + if (Blob.Metadata) + { + m_MetadataEntries[Blob.Metadata] = Entry; + } + else + { + Blob.Metadata = MetadataIndex(gsl::narrow<uint32_t>(m_MetadataEntries.size())); + m_MetadataEntries.push_back(Entry); + } + Blob.LastAccessTime = GcClock::TickCount(); + } + else + { + MetadataIndex NewMetadataIndex = MetadataIndex(gsl::narrow<uint32_t>(m_MetadataEntries.size())); + m_MetadataEntries.push_back(Entry); + + const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size())); + m_BlobEntries.push_back(BlobEntry{.Metadata = NewMetadataIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); + m_BlobLookup.insert({BlobHash, NewBlobIndex}); + } + WriteBlobIndex++; + if (m_TrackedCacheKeys) + { + m_TrackedCacheKeys->insert(BlobHash); + } + } + }); +} + +std::vector<IoBuffer> +BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* OptionalWorkerPool) +{ + ZEN_TRACE_CPU("BuildStore::GetMetadatas"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + std::vector<BlockStoreLocation> MetaLocations; + std::vector<size_t> MetaLocationResultIndexes; + MetaLocations.reserve(BlobHashes.size()); + MetaLocationResultIndexes.reserve(BlobHashes.size()); + tsl::robin_set<uint32_t> ReferencedBlocks; + + std::vector<IoBuffer> Result; + std::vector<ZenContentType> ResultContentTypes; + Result.resize(BlobHashes.size()); + ResultContentTypes.resize(BlobHashes.size(), ZenContentType::kUnknownContentType); + { + RwLock::SharedLockScope _(m_Lock); + for (size_t Index = 0; Index < BlobHashes.size(); Index++) + { + const IoHash& BlobHash = BlobHashes[Index]; + if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) + { + const BlobIndex ExistingBlobIndex = It->second; + BlobEntry& ExistingBlobEntry = m_BlobEntries[ExistingBlobIndex]; + if (ExistingBlobEntry.Metadata) + { + const MetadataEntry& ExistingMetadataEntry = m_MetadataEntries[ExistingBlobEntry.Metadata]; + MetaLocations.push_back(ExistingMetadataEntry.Location); + MetaLocationResultIndexes.push_back(Index); + ReferencedBlocks.insert(ExistingMetadataEntry.Location.BlockIndex); + ResultContentTypes[Index] = ExistingMetadataEntry.ContentType; + } + ExistingBlobEntry.LastAccessTime = AccessTime(GcClock::Tick()); + } + } + } + + auto DoOneBlock = [&](std::span<const size_t> ChunkIndexes) { + if (ChunkIndexes.size() < 4) + { + for (size_t ChunkIndex : ChunkIndexes) + { + IoBuffer Chunk = m_MetadataBlockStore.TryGetChunk(MetaLocations[ChunkIndex]); + if (Chunk) + { + size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex]; + Result[ResultIndex] = std::move(Chunk); + } + } + return true; + } + return m_MetadataBlockStore.IterateBlock( + MetaLocations, + ChunkIndexes, + [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + if (Data != nullptr) + { + size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex]; + Result[ResultIndex] = IoBuffer(IoBuffer::Clone, Data, Size); + } + return true; + }, + [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex]; + Result[ResultIndex] = File.GetChunk(Offset, Size); + return true; + }, + 8u * 1024u); + }; + + if (!MetaLocations.empty()) + { + Latch WorkLatch(1); + + m_MetadataBlockStore.IterateChunks(MetaLocations, [&](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) -> bool { + ZEN_UNUSED(BlockIndex); + if (ChunkIndexes.size() == MetaLocations.size() || OptionalWorkerPool == nullptr || ReferencedBlocks.size() == 1) + { + return DoOneBlock(ChunkIndexes); + } + else + { + ZEN_ASSERT(OptionalWorkerPool != nullptr); + WorkLatch.AddCount(1); + try + { + OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + try + { + DoOneBlock(ChunkIndexes); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed getting metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what()); + } + }); + } + catch (const std::exception& Ex) + { + WorkLatch.CountDown(); + ZEN_ERROR("Failed dispatching async work to fetch metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what()); + } + return true; + } + }); + + WorkLatch.CountDown(); + WorkLatch.Wait(); + } + for (size_t Index = 0; Index < Result.size(); Index++) + { + if (Result[Index]) + { + Result[Index].SetContentType(ResultContentTypes[Index]); + } + } + return Result; +} + +void +BuildStore::Flush() +{ + ZEN_TRACE_CPU("BuildStore::Flush"); + try + { + m_LargeBlobStore.Flush(); + m_SmallBlobStore.Flush(); + m_MetadataBlockStore.Flush(false); + + m_PayloadlogFile.Flush(); + m_MetadatalogFile.Flush(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("BuildStore::Flush failed. Reason: {}", Ex.what()); + } +} + +void +BuildStore::CompactState() +{ + ZEN_TRACE_CPU("BuildStore::CompactState"); + + std::vector<BlobEntry> BlobEntries; + std::vector<PayloadEntry> PayloadEntries; + std::vector<MetadataEntry> MetadataEntries; + + tsl::robin_map<IoHash, BlobIndex, IoHash::Hasher> BlobLookup; + + RwLock::ExclusiveLockScope _(m_Lock); + const size_t EntryCount = m_BlobLookup.size(); + BlobLookup.reserve(EntryCount); + const size_t PayloadCount = m_PayloadEntries.size(); + PayloadEntries.reserve(PayloadCount); + const size_t MetadataCount = m_MetadataEntries.size(); + MetadataEntries.reserve(MetadataCount); + + for (auto LookupIt : m_BlobLookup) + { + const IoHash& BlobHash = LookupIt.first; + const BlobIndex ReadBlobIndex = LookupIt.second; + const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex]; + + const BlobIndex WriteBlobIndex(gsl::narrow<uint32_t>(BlobEntries.size())); + BlobEntries.push_back(ReadBlobEntry); + BlobEntry& WriteBlobEntry = BlobEntries.back(); + + if (WriteBlobEntry.Payload) + { + const PayloadEntry& ReadPayloadEntry = m_PayloadEntries[ReadBlobEntry.Payload]; + WriteBlobEntry.Payload = PayloadIndex(gsl::narrow<uint32_t>(PayloadEntries.size())); + PayloadEntries.push_back(ReadPayloadEntry); + } + if (ReadBlobEntry.Metadata) + { + const MetadataEntry& ReadMetadataEntry = m_MetadataEntries[ReadBlobEntry.Metadata]; + WriteBlobEntry.Metadata = MetadataIndex(gsl::narrow<uint32_t>(MetadataEntries.size())); + MetadataEntries.push_back(ReadMetadataEntry); + } + + BlobLookup.insert({BlobHash, WriteBlobIndex}); + } + m_BlobEntries.swap(BlobEntries); + m_PayloadEntries.swap(PayloadEntries); + m_MetadataEntries.swap(MetadataEntries); + m_BlobLookup.swap(BlobLookup); +} + +uint64_t +BuildStore::ReadPayloadLog(const RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount) +{ + ZEN_TRACE_CPU("BuildStore::ReadPayloadLog"); + if (!std::filesystem::is_regular_file(LogPath)) + { + return 0; + } + + uint64_t LogEntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read build store '{}' payload log containing {} entries in {}", + LogPath, + LogEntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + TCasLogFile<PayloadDiskEntry> CasLog; + if (!CasLog.IsValid(LogPath)) + { + std::filesystem::remove(LogPath); + return 0; + } + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + if (!CasLog.Initialize()) + { + return 0; + } + + const uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full payload log at '{}', reason: Log position from index snapshot is out of range", LogPath); + SkipEntryCount = 0; + } + + LogEntryCount = EntryCount - SkipEntryCount; + uint64_t InvalidEntryCount = 0; + + CasLog.Replay( + [&](const PayloadDiskEntry& Record) { + std::string InvalidEntryReason; + if (Record.Entry.Flags & PayloadEntry::kTombStone) + { + // Note: this leaves m_BlobLookup and other arrays with 'holes' in them, this will get clean up in compact gc operation + m_BlobLookup.erase(Record.BlobHash); + return; + } + + if (!ValidatePayloadDiskEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid payload entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); + ++InvalidEntryCount; + return; + } + if (auto It = m_BlobLookup.find(Record.BlobHash); It != m_BlobLookup.end()) + { + const BlobIndex ExistingBlobIndex = It->second; + BlobEntry& ExistingBlob = m_BlobEntries[ExistingBlobIndex]; + if (ExistingBlob.Payload) + { + const PayloadIndex ExistingPayloadIndex = ExistingBlob.Payload; + m_PayloadEntries[ExistingPayloadIndex] = Record.Entry; + } + else + { + const PayloadIndex NewPayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size())); + m_PayloadEntries.push_back(Record.Entry); + ExistingBlob.Payload = NewPayloadIndex; + } + } + else + { + const PayloadIndex NewPayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size())); + m_PayloadEntries.push_back(Record.Entry); + + const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size())); + m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::Tick())}); + m_BlobLookup.insert_or_assign(Record.BlobHash, NewBlobIndex); + } + }, + SkipEntryCount); + + if (InvalidEntryCount) + { + ZEN_WARN("found {} invalid payload entries in '{}'", InvalidEntryCount, LogPath); + } + + return LogEntryCount; +} + +uint64_t +BuildStore::ReadMetadataLog(const RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount) +{ + ZEN_TRACE_CPU("BuildStore::ReadMetadataLog"); + if (!std::filesystem::is_regular_file(LogPath)) + { + return 0; + } + + uint64_t LogEntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read build store '{}' metadata log containing {} entries in {}", + LogPath, + LogEntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + TCasLogFile<MetadataDiskEntry> CasLog; + if (!CasLog.IsValid(LogPath)) + { + std::filesystem::remove(LogPath); + return 0; + } + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + if (!CasLog.Initialize()) + { + return 0; + } + + const uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full metadata log at '{}', reason: Log position from index snapshot is out of range", LogPath); + SkipEntryCount = 0; + } + + LogEntryCount = EntryCount - SkipEntryCount; + uint64_t InvalidEntryCount = 0; + + CasLog.Replay( + [&](const MetadataDiskEntry& Record) { + std::string InvalidEntryReason; + if (Record.Entry.Flags & MetadataEntry::kTombStone) + { + // Note: this leaves m_BlobLookup and other arrays with 'holes' in them, this will get clean up in compact gc operation + m_BlobLookup.erase(Record.BlobHash); + return; + } + + if (!ValidateMetadataDiskEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid metadata entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); + ++InvalidEntryCount; + return; + } + if (auto It = m_BlobLookup.find(Record.BlobHash); It != m_BlobLookup.end()) + { + const BlobIndex ExistingBlobIndex = It->second; + BlobEntry& ExistingBlob = m_BlobEntries[ExistingBlobIndex]; + if (ExistingBlob.Metadata) + { + const MetadataIndex ExistingMetadataIndex = ExistingBlob.Metadata; + m_MetadataEntries[ExistingMetadataIndex] = Record.Entry; + } + else + { + const MetadataIndex NewMetadataIndex(gsl::narrow<uint32_t>(m_MetadataEntries.size())); + m_MetadataEntries.push_back(Record.Entry); + ExistingBlob.Metadata = NewMetadataIndex; + } + } + else + { + const MetadataIndex NewMetadataIndex(gsl::narrow<uint32_t>(m_MetadataEntries.size())); + m_MetadataEntries.push_back(Record.Entry); + + const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size())); + m_BlobEntries.push_back(BlobEntry{.Metadata = NewMetadataIndex, .LastAccessTime = AccessTime(GcClock::Tick())}); + m_BlobLookup.insert_or_assign(Record.BlobHash, NewBlobIndex); + } + }, + SkipEntryCount); + + if (InvalidEntryCount) + { + ZEN_WARN("found {} invalid metadata entries in '{}'", InvalidEntryCount, LogPath); + } + + return LogEntryCount; +} + +bool +BuildStore::ValidatePayloadDiskEntry(const PayloadDiskEntry& Entry, std::string& OutReason) +{ + if (Entry.BlobHash == IoHash::Zero) + { + OutReason = fmt::format("Invalid blob hash {}", Entry.BlobHash.ToHexString()); + return false; + } + if (Entry.Entry.Flags & ~(PayloadEntry::kTombStone | PayloadEntry::kStandalone)) + { + OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Entry.Flags, Entry.BlobHash.ToHexString()); + return false; + } + if (Entry.Entry.Flags & PayloadEntry::kTombStone) + { + return true; + } + if (Entry.Entry.Reserved1 != 0 || Entry.Entry.Reserved2 != 0 || Entry.Entry.Reserved3 != 0) + { + OutReason = fmt::format("Invalid reserved fields for meta entry {}", Entry.BlobHash.ToHexString()); + return false; + } + return true; +} + +bool +BuildStore::ValidateMetadataDiskEntry(const MetadataDiskEntry& Entry, std::string& OutReason) +{ + if (Entry.BlobHash == IoHash::Zero) + { + OutReason = fmt::format("Invalid blob hash {} for meta entry", Entry.BlobHash.ToHexString()); + return false; + } + if (Entry.Entry.Location.Size == 0) + { + OutReason = fmt::format("Invalid meta blob size {} for meta entry", Entry.Entry.Location.Size); + return false; + } + if (Entry.Entry.Reserved1 != 0 || Entry.Entry.Reserved2 != 0) + { + OutReason = fmt::format("Invalid reserved fields for meta entry {}", Entry.BlobHash.ToHexString()); + return false; + } + if (Entry.Entry.Flags & MetadataEntry::kTombStone) + { + return true; + } + if (Entry.Entry.ContentType == ZenContentType::kCOUNT) + { + OutReason = fmt::format("Invalid content type for meta entry {}", Entry.BlobHash.ToHexString()); + return false; + } + if (Entry.Reserved1 != 0 || Entry.Reserved2 != 0 || Entry.Reserved3 != 0 || Entry.Reserved4 != 0) + { + OutReason = fmt::format("Invalid reserved fields for meta entry {}", Entry.BlobHash.ToHexString()); + return false; + } + return true; +} + +class BuildStoreGcReferenceChecker : public GcReferenceChecker +{ +public: + BuildStoreGcReferenceChecker(BuildStore& Store) : m_Store(Store) {} + virtual std::string GetGcName(GcCtx& Ctx) override + { + ZEN_UNUSED(Ctx); + return fmt::format("buildstore: '{}'", m_Store.m_Config.RootDirectory.string()); + } + + virtual void PreCache(GcCtx& Ctx) override { ZEN_UNUSED(Ctx); } + + virtual void UpdateLockedState(GcCtx& Ctx) override + { + ZEN_TRACE_CPU("Builds::UpdateLockedState"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + + auto Log = [&Ctx]() { return Ctx.Logger; }; + + m_References.reserve(m_Store.m_BlobLookup.size()); + for (const auto& It : m_Store.m_BlobLookup) + { + const BuildStore::BlobIndex ExistingBlobIndex = It.second; + if (m_Store.m_BlobEntries[ExistingBlobIndex].Payload) + { + m_References.push_back(It.first); + } + } + FilterReferences(Ctx, fmt::format("buildstore [LOCKSTATE] '{}'", "buildstore"), m_References); + } + + virtual std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override + { + ZEN_UNUSED(Ctx); + ZEN_TRACE_CPU("Builds::GetUnusedReferences"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + + auto Log = [&Ctx]() { return Ctx.Logger; }; + + size_t InitialCount = IoCids.size(); + size_t UsedCount = InitialCount; + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: buildstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}", + "buildstore", + UsedCount, + InitialCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + std::span<IoHash> UnusedReferences = KeepUnusedReferences(m_References, IoCids); + UsedCount = IoCids.size() - UnusedReferences.size(); + return UnusedReferences; + } + +private: + BuildStore& m_Store; + std::vector<IoHash> m_References; +}; + +std::string +BuildStore::GetGcName(GcCtx& Ctx) +{ + ZEN_UNUSED(Ctx); + ZEN_MEMSCOPE(GetBuildstoreTag()); + + return fmt::format("buildstore: '{}'", m_Config.RootDirectory.string()); +} + +class BuildStoreGcCompator : public GcStoreCompactor +{ + using BlobEntry = BuildStore::BlobEntry; + using PayloadEntry = BuildStore::PayloadEntry; + using MetadataEntry = BuildStore::MetadataEntry; + using MetadataDiskEntry = BuildStore::MetadataDiskEntry; + using BlobIndex = BuildStore::BlobIndex; + using PayloadIndex = BuildStore::PayloadIndex; + using MetadataIndex = BuildStore::MetadataIndex; + +public: + BuildStoreGcCompator(BuildStore& Store, std::vector<IoHash>&& RemovedBlobs) : m_Store(Store), m_RemovedBlobs(std::move(RemovedBlobs)) {} + + virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>& ClaimDiskReserveCallback) override + { + ZEN_UNUSED(ClaimDiskReserveCallback); + ZEN_TRACE_CPU("Builds::CompactStore"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + + auto Log = [&Ctx]() { return Ctx.Logger; }; + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: buildstore [COMPACT] '{}': RemovedDisk: {} in {}", + m_Store.m_Config.RootDirectory, + NiceBytes(Stats.RemovedDisk), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + if (!m_RemovedBlobs.empty()) + { + if (Ctx.Settings.CollectSmallObjects) + { + m_Store.m_Lock.WithExclusiveLock([this]() { m_Store.m_TrackedCacheKeys = std::make_unique<HashSet>(); }); + auto __ = MakeGuard([this]() { m_Store.m_Lock.WithExclusiveLock([&]() { m_Store.m_TrackedCacheKeys.reset(); }); }); + + BlockStore::BlockUsageMap BlockUsage; + { + RwLock::SharedLockScope __(m_Store.m_Lock); + + for (auto LookupIt : m_Store.m_BlobLookup) + { + const BlobIndex ReadBlobIndex = LookupIt.second; + const BlobEntry& ReadBlobEntry = m_Store.m_BlobEntries[ReadBlobIndex]; + + if (ReadBlobEntry.Metadata) + { + const MetadataEntry& ReadMetadataEntry = m_Store.m_MetadataEntries[ReadBlobEntry.Metadata]; + + uint32_t BlockIndex = ReadMetadataEntry.Location.BlockIndex; + uint64_t ChunkSize = RoundUp(ReadMetadataEntry.Location.Size, m_Store.m_Config.MetadataBlockStoreAlignement); + + if (auto BlockUsageIt = BlockUsage.find(BlockIndex); BlockUsageIt != BlockUsage.end()) + { + BlockStore::BlockUsageInfo& Info = BlockUsageIt.value(); + Info.EntryCount++; + Info.DiskUsage += ChunkSize; + } + else + { + BlockUsage.insert_or_assign(BlockIndex, + BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1}); + } + } + } + } + + BlockStore::BlockEntryCountMap BlocksToCompact = m_Store.m_MetadataBlockStore.GetBlocksToCompact(BlockUsage, 90); + BlockStoreCompactState BlockCompactState; + std::vector<IoHash> BlockCompactStateKeys; + BlockCompactState.IncludeBlocks(BlocksToCompact); + + if (BlocksToCompact.size() > 0) + { + { + RwLock::SharedLockScope ___(m_Store.m_Lock); + for (const auto& Entry : m_Store.m_BlobLookup) + { + BlobIndex Index = Entry.second; + + if (MetadataIndex Meta = m_Store.m_BlobEntries[Index].Metadata; Meta) + { + if (BlockCompactState.AddKeepLocation(m_Store.m_MetadataEntries[Meta].Location)) + { + BlockCompactStateKeys.push_back(Entry.first); + } + } + } + } + + if (Ctx.Settings.IsDeleteMode) + { + if (Ctx.Settings.Verbose) + { + ZEN_INFO("GCV2: buildstore [COMPACT] '{}': compacting {} blocks", + m_Store.m_Config.RootDirectory, + BlocksToCompact.size()); + } + + m_Store.m_MetadataBlockStore.CompactBlocks( + BlockCompactState, + m_Store.m_Config.MetadataBlockStoreAlignement, + [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) { + std::vector<MetadataDiskEntry> MovedEntries; + MovedEntries.reserve(MovedArray.size()); + RwLock::ExclusiveLockScope _(m_Store.m_Lock); + for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray) + { + size_t ChunkIndex = Moved.first; + const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; + + ZEN_ASSERT(m_Store.m_TrackedCacheKeys); + if (m_Store.m_TrackedCacheKeys->contains(Key)) + { + continue; + } + + if (auto It = m_Store.m_BlobLookup.find(Key); It != m_Store.m_BlobLookup.end()) + { + const BlobIndex Index = It->second; + + if (MetadataIndex Meta = m_Store.m_BlobEntries[Index].Metadata; Meta) + { + m_Store.m_MetadataEntries[Meta].Location = Moved.second; + MovedEntries.push_back( + MetadataDiskEntry{.Entry = m_Store.m_MetadataEntries[Meta], .BlobHash = Key}); + } + } + } + m_Store.m_MetadatalogFile.Append(MovedEntries); + Stats.RemovedDisk += FreedDiskSpace; + if (Ctx.IsCancelledFlag.load()) + { + return false; + } + return true; + }, + ClaimDiskReserveCallback, + fmt::format("GCV2: buildstore [COMPACT] '{}': ", m_Store.m_Config.RootDirectory)); + } + else + { + if (Ctx.Settings.Verbose) + { + ZEN_INFO("GCV2: buildstore [COMPACT] '{}': skipped compacting of {} eligible blocks", + m_Store.m_Config.RootDirectory, + BlocksToCompact.size()); + } + } + } + } + } + } + + virtual std::string GetGcName(GcCtx& Ctx) override + { + ZEN_UNUSED(Ctx); + ZEN_MEMSCOPE(GetBuildstoreTag()); + + return fmt::format("buildstore: '{}'", m_Store.m_Config.RootDirectory.string()); + } + +private: + BuildStore& m_Store; + const std::vector<IoHash> m_RemovedBlobs; +}; + +GcStoreCompactor* +BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) +{ + ZEN_TRACE_CPU("Builds::RemoveExpiredData"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + + auto Log = [&Ctx]() { return Ctx.Logger; }; + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (Ctx.Settings.Verbose) + { + ZEN_INFO("GCV2: buildstore [REMOVE EXPIRED] '{}': Count: {}, Expired: {}, Deleted: {}, FreedMemory: {} in {}", + m_Config.RootDirectory, + Stats.CheckedCount, + Stats.FoundCount, + Stats.DeletedCount, + NiceBytes(Stats.FreedMemory), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + }); + + const GcClock::Tick ExpireTicks = Ctx.Settings.BuildStoreExpireTime.time_since_epoch().count(); + + std::vector<IoHash> ExpiredBlobs; + { + RwLock::SharedLockScope __(m_Lock); + for (const auto& It : m_BlobLookup) + { + const BlobIndex ReadBlobIndex = It.second; + const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex]; + + const GcClock::Tick AccessTick = ReadBlobEntry.LastAccessTime; + if (AccessTick < ExpireTicks) + { + ExpiredBlobs.push_back(It.first); + } + } + Stats.CheckedCount += m_BlobLookup.size(); + Stats.FoundCount += ExpiredBlobs.size(); + } + + std::vector<IoHash> RemovedBlobs; + if (!ExpiredBlobs.empty()) + { + if (Ctx.Settings.IsDeleteMode) + { + RemovedBlobs.reserve(ExpiredBlobs.size()); + + std::vector<PayloadDiskEntry> RemovedPayloads; + std::vector<MetadataDiskEntry> RemoveMetadatas; + + RwLock::ExclusiveLockScope __(m_Lock); + if (Ctx.IsCancelledFlag.load()) + { + return nullptr; + } + + for (const IoHash& ExpiredBlob : ExpiredBlobs) + { + if (auto It = m_BlobLookup.find(ExpiredBlob); It != m_BlobLookup.end()) + { + const BlobIndex ReadBlobIndex = It->second; + const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex]; + + const GcClock::Tick AccessTick = ReadBlobEntry.LastAccessTime; + + if (AccessTick < ExpireTicks) + { + if (ReadBlobEntry.Payload) + { + RemovedPayloads.push_back( + PayloadDiskEntry{.Entry = m_PayloadEntries[ReadBlobEntry.Payload], .BlobHash = ExpiredBlob}); + RemovedPayloads.back().Entry.Flags |= PayloadEntry::kTombStone; + m_PayloadEntries[ReadBlobEntry.Payload] = {}; + m_BlobEntries[ReadBlobIndex].Payload = {}; + } + if (ReadBlobEntry.Metadata) + { + RemoveMetadatas.push_back( + MetadataDiskEntry{.Entry = m_MetadataEntries[ReadBlobEntry.Metadata], .BlobHash = ExpiredBlob}); + RemoveMetadatas.back().Entry.Flags |= MetadataEntry::kTombStone; + m_MetadataEntries[ReadBlobEntry.Metadata] = {}; + m_BlobEntries[ReadBlobIndex].Metadata = {}; + } + + m_BlobLookup.erase(It); + + RemovedBlobs.push_back(ExpiredBlob); + Stats.DeletedCount++; + } + } + } + if (!RemovedPayloads.empty()) + { + m_PayloadlogFile.Append(RemovedPayloads); + } + if (!RemoveMetadatas.empty()) + { + m_MetadatalogFile.Append(RemoveMetadatas); + } + } + } + + if (!RemovedBlobs.empty()) + { + CompactState(); + } + + return new BuildStoreGcCompator(*this, std::move(RemovedBlobs)); +} + +std::vector<GcReferenceChecker*> +BuildStore::CreateReferenceCheckers(GcCtx& Ctx) +{ + ZEN_UNUSED(Ctx); + ZEN_MEMSCOPE(GetBuildstoreTag()); + return {new BuildStoreGcReferenceChecker(*this)}; +} + +std::vector<GcReferenceValidator*> +BuildStore::CreateReferenceValidators(GcCtx& Ctx) +{ + ZEN_UNUSED(Ctx); + return {}; +} + +std::vector<RwLock::SharedLockScope> +BuildStore::LockState(GcCtx& Ctx) +{ + ZEN_UNUSED(Ctx); + std::vector<RwLock::SharedLockScope> Locks; + Locks.emplace_back(RwLock::SharedLockScope(m_Lock)); + return Locks; +} + +/* + ___________ __ + \__ ___/___ _______/ |_ ______ + | |_/ __ \ / ___/\ __\/ ___/ + | |\ ___/ \___ \ | | \___ \ + |____| \___ >____ > |__| /____ > + \/ \/ \/ +*/ + +#if ZEN_WITH_TESTS + +TEST_CASE("BuildStore.Blobs") +{ + ScopedTemporaryDirectory _; + + BuildStoreConfig Config; + Config.RootDirectory = _.Path() / "build_store"; + + std::vector<IoHash> CompressedBlobsHashes; + { + GcManager Gc; + BuildStore Store(Config, Gc); + + for (size_t I = 0; I < 5; I++) + { + IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); + CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); + CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); + IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCompressedBinary); + + Store.PutBlob(CompressedBlobsHashes.back(), Payload); + } + + for (const IoHash& RawHash : CompressedBlobsHashes) + { + IoBuffer Payload = Store.GetBlob(RawHash); + CHECK(Payload); + CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary); + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + CompressedBuffer CompressedBlob = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize); + CHECK(CompressedBlob); + CHECK(VerifyRawHash == RawHash); + IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); + CHECK(IoHash::HashBuffer(Decompressed) == RawHash); + } + } + { + GcManager Gc; + BuildStore Store(Config, Gc); + for (const IoHash& RawHash : CompressedBlobsHashes) + { + IoBuffer Payload = Store.GetBlob(RawHash); + CHECK(Payload); + CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary); + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + CompressedBuffer CompressedBlob = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize); + CHECK(CompressedBlob); + CHECK(VerifyRawHash == RawHash); + IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); + CHECK(IoHash::HashBuffer(Decompressed) == RawHash); + } + + for (size_t I = 0; I < 5; I++) + { + IoBuffer Blob = CreateSemiRandomBlob(5713 + I * 7); + CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); + CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); + IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCompressedBinary); + + Store.PutBlob(CompressedBlobsHashes.back(), Payload); + } + } + { + GcManager Gc; + BuildStore Store(Config, Gc); + for (const IoHash& RawHash : CompressedBlobsHashes) + { + IoBuffer Payload = Store.GetBlob(RawHash); + CHECK(Payload); + CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary); + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + CompressedBuffer CompressedBlob = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize); + CHECK(CompressedBlob); + CHECK(VerifyRawHash == RawHash); + IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); + CHECK(IoHash::HashBuffer(Decompressed) == RawHash); + } + } +} + +namespace blockstore::testing { + IoBuffer MakeMetaData(const IoHash& BlobHash, const std::vector<std::pair<std::string, std::string>>& KeyValues) + { + CbObjectWriter Writer; + Writer.AddHash("rawHash"sv, BlobHash); + Writer.BeginObject("values"); + { + for (const auto& V : KeyValues) + { + Writer.AddString(V.first, V.second); + } + } + Writer.EndObject(); // values + return Writer.Save().GetBuffer().AsIoBuffer(); + }; + +} // namespace blockstore::testing + +TEST_CASE("BuildStore.Metadata") +{ + using namespace blockstore::testing; + + ScopedTemporaryDirectory _; + + BuildStoreConfig Config; + Config.RootDirectory = _.Path() / "build_store"; + + std::vector<IoHash> BlobHashes; + std::vector<IoBuffer> MetaPayloads; + { + GcManager Gc; + BuildStore Store(Config, Gc); + + for (size_t I = 0; I < 5; I++) + { + BlobHashes.push_back(IoHash::HashBuffer(&I, sizeof(I))); + MetaPayloads.push_back(MakeMetaData(BlobHashes.back(), {{"index", fmt::format("{}", I)}})); + MetaPayloads.back().SetContentType(ZenContentType::kCbObject); + } + Store.PutMetadatas(BlobHashes, MetaPayloads); + + std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, nullptr); + CHECK(ValidateMetaPayloads.size() == MetaPayloads.size()); + for (size_t I = 0; I < ValidateMetaPayloads.size(); I++) + { + const IoHash ExpectedHash = IoHash::HashBuffer(MetaPayloads[I]); + CHECK_EQ(IoHash::HashBuffer(ValidateMetaPayloads[I]), ExpectedHash); + } + } + { + GcManager Gc; + BuildStore Store(Config, Gc); + std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, nullptr); + CHECK(ValidateMetaPayloads.size() == MetaPayloads.size()); + for (size_t I = 0; I < ValidateMetaPayloads.size(); I++) + { + const IoHash ExpectedHash = IoHash::HashBuffer(MetaPayloads[I]); + CHECK_EQ(IoHash::HashBuffer(ValidateMetaPayloads[I]), ExpectedHash); + } + for (const IoHash& BlobHash : BlobHashes) + { + CHECK(!Store.GetBlob(BlobHash)); + } + } + std::vector<IoHash> CompressedBlobsHashes; + { + GcManager Gc; + BuildStore Store(Config, Gc); + for (size_t I = 0; I < 5; I++) + { + IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); + CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); + CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); + IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCompressedBinary); + + Store.PutBlob(CompressedBlobsHashes.back(), Payload); + } + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + for (const auto& MetadataIt : MetadataPayloads) + { + CHECK(!MetadataIt); + } + for (const IoHash& BlobHash : CompressedBlobsHashes) + { + IoBuffer Blob = Store.GetBlob(BlobHash); + CHECK(Blob); + IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer(); + CHECK(DecompressedBlob); + CHECK_EQ(IoHash::HashBuffer(DecompressedBlob), BlobHash); + } + } + + std::vector<IoBuffer> BlobMetaPayloads; + { + GcManager Gc; + BuildStore Store(Config, Gc); + for (const IoHash& BlobHash : CompressedBlobsHashes) + { + BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}})); + BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject); + } + Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads); + + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); + for (size_t I = 0; I < MetadataPayloads.size(); I++) + { + const IoBuffer& MetadataPayload = MetadataPayloads[I]; + CHECK_EQ(IoHash::HashBuffer(MetadataPayload), IoHash::HashBuffer(BlobMetaPayloads[I])); + } + } + + { + GcManager Gc; + BuildStore Store(Config, Gc); + + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); + for (size_t I = 0; I < MetadataPayloads.size(); I++) + { + const IoBuffer& MetadataPayload = MetadataPayloads[I]; + CHECK(IoHash::HashBuffer(MetadataPayload) == IoHash::HashBuffer(BlobMetaPayloads[I])); + } + for (const IoHash& BlobHash : CompressedBlobsHashes) + { + IoBuffer Blob = Store.GetBlob(BlobHash); + CHECK(Blob); + IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer(); + CHECK(DecompressedBlob); + CHECK_EQ(IoHash::HashBuffer(DecompressedBlob), BlobHash); + } + + BlobMetaPayloads.clear(); + for (const IoHash& BlobHash : CompressedBlobsHashes) + { + BlobMetaPayloads.push_back( + MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}, {"replaced", fmt::format("{}", true)}})); + BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject); + } + Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads); + } + { + GcManager Gc; + BuildStore Store(Config, Gc); + + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); + for (size_t I = 0; I < MetadataPayloads.size(); I++) + { + const IoBuffer& MetadataPayload = MetadataPayloads[I]; + CHECK(IoHash::HashBuffer(MetadataPayload) == IoHash::HashBuffer(BlobMetaPayloads[I])); + } + for (const IoHash& BlobHash : CompressedBlobsHashes) + { + IoBuffer Blob = Store.GetBlob(BlobHash); + CHECK(Blob); + IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer(); + CHECK(DecompressedBlob); + CHECK_EQ(IoHash::HashBuffer(DecompressedBlob), BlobHash); + } + } +} + +TEST_CASE("BuildStore.GC") +{ + using namespace blockstore::testing; + + ScopedTemporaryDirectory _; + + BuildStoreConfig Config; + Config.RootDirectory = _.Path() / "build_store"; + + std::vector<IoHash> CompressedBlobsHashes; + std::vector<IoBuffer> BlobMetaPayloads; + { + GcManager Gc; + BuildStore Store(Config, Gc); + for (size_t I = 0; I < 5; I++) + { + IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); + CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); + CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); + IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCompressedBinary); + + Store.PutBlob(CompressedBlobsHashes.back(), Payload); + } + for (const IoHash& BlobHash : CompressedBlobsHashes) + { + BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}})); + BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject); + } + Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads); + } + { + GcManager Gc; + BuildStore Store(Config, Gc); + + { + GcResult Result = Gc.CollectGarbage(GcSettings{.BuildStoreExpireTime = GcClock::Now() - std::chrono::hours(1), + .CollectSmallObjects = false, + .IsDeleteMode = false, + .Verbose = true}); + CHECK(!Result.WasCancelled); + for (const IoHash& BlobHash : CompressedBlobsHashes) + { + IoBuffer Blob = Store.GetBlob(BlobHash); + CHECK(Blob); + IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer(); + CHECK(DecompressedBlob); + CHECK(IoHash::HashBuffer(DecompressedBlob) == BlobHash); + } + + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); + for (size_t I = 0; I < MetadataPayloads.size(); I++) + { + const IoBuffer& MetadataPayload = MetadataPayloads[I]; + CHECK(IoHash::HashBuffer(MetadataPayload) == IoHash::HashBuffer(BlobMetaPayloads[I])); + } + } + { + GcResult Result = Gc.CollectGarbage(GcSettings{.BuildStoreExpireTime = GcClock::Now() + std::chrono::hours(1), + .CollectSmallObjects = true, + .IsDeleteMode = true, + .Verbose = true}); + CHECK(!Result.WasCancelled); + for (const IoHash& BlobHash : CompressedBlobsHashes) + { + IoBuffer Blob = Store.GetBlob(BlobHash); + CHECK(!Blob); + } + + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); + for (size_t I = 0; I < MetadataPayloads.size(); I++) + { + const IoBuffer& MetadataPayload = MetadataPayloads[I]; + CHECK(!MetadataPayload); + } + } + } +} + +void +buildstore_forcelink() +{ +} + +#endif + +} // namespace zen diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 2be0542db..b64bc26dd 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -226,7 +226,7 @@ CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) } std::vector<CasStore::InsertResult> -CasContainerStrategy::InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash> ChunkHashes) +CasContainerStrategy::InsertChunks(std::span<const IoBuffer> Chunks, std::span<const IoHash> ChunkHashes) { ZEN_MEMSCOPE(GetCasContainerTag()); @@ -323,7 +323,7 @@ CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks) } bool -CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, +CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, WorkerThreadPool* OptionalWorkerPool, uint64_t LargeSizeLimit) diff --git a/src/zenstore/compactcas.h b/src/zenstore/compactcas.h index 07e620086..2eb4c233a 100644 --- a/src/zenstore/compactcas.h +++ b/src/zenstore/compactcas.h @@ -52,11 +52,11 @@ struct CasContainerStrategy final : public GcStorage, public GcReferenceStore ~CasContainerStrategy(); CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash); - std::vector<CasStore::InsertResult> InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash> ChunkHashes); + std::vector<CasStore::InsertResult> InsertChunks(std::span<const IoBuffer> Chunks, std::span<const IoHash> ChunkHashes); IoBuffer FindChunk(const IoHash& ChunkHash); bool HaveChunk(const IoHash& ChunkHash); void FilterChunks(HashKeySet& InOutChunks); - bool IterateChunks(std::span<IoHash> ChunkHashes, + bool IterateChunks(std::span<const IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, WorkerThreadPool* OptionalWorkerPool, uint64_t LargeSizeLimit); diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index 7ac10d613..fe5ae284b 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -1081,7 +1081,7 @@ GcManager::CollectGarbage(const GcSettings& Settings) ZEN_INFO("GCV2: Locking state for {} reference checkers", ReferenceCheckers.size()); { ZEN_TRACE_CPU("GcV2::LockReferencers"); - // From this point we have blocked all writes to all References (DiskBucket/ProjectStore) until + // From this point we have blocked all writes to all References (DiskBucket/ProjectStore/BuildStore) until // we delete the ReferenceLockers Latch WorkLeft(1); { @@ -1108,7 +1108,7 @@ GcManager::CollectGarbage(const GcSettings& Settings) ZEN_TRACE_CPU("GcV2::UpdateLockedState"); // Locking all references checkers so we have a steady state of which references are used - // From this point we have blocked all writes to all References (DiskBucket/ProjectStore) until + // From this point we have blocked all writes to all References (DiskBucket/ProjectStore/BuildStore) until // we delete the ReferenceCheckers Latch WorkLeft(1); @@ -1739,6 +1739,7 @@ GcScheduler::AppendGCLog(std::string_view Id, GcClock::TimePoint StartTime, cons { Writer << "CacheExpireTime"sv << ToDateTime(Settings.CacheExpireTime); Writer << "ProjectStoreExpireTime"sv << ToDateTime(Settings.ProjectStoreExpireTime); + Writer << "BuildStoreExpireTime"sv << ToDateTime(Settings.BuildStoreExpireTime); Writer << "CollectSmallObjects"sv << Settings.CollectSmallObjects; Writer << "IsDeleteMode"sv << Settings.IsDeleteMode; Writer << "SkipCidDelete"sv << Settings.SkipCidDelete; @@ -1940,6 +1941,7 @@ GcScheduler::SchedulerThread() std::chrono::seconds LightweightGcInterval = m_Config.LightweightInterval; std::chrono::seconds MaxCacheDuration = m_Config.MaxCacheDuration; std::chrono::seconds MaxProjectStoreDuration = m_Config.MaxProjectStoreDuration; + std::chrono::seconds MaxBuildStoreDuration = m_Config.MaxBuildStoreDuration; uint64_t DiskSizeSoftLimit = m_Config.DiskSizeSoftLimit; bool SkipCid = false; GcVersion UseGCVersion = m_Config.UseGCVersion; @@ -1975,6 +1977,10 @@ GcScheduler::SchedulerThread() { MaxProjectStoreDuration = TriggerParams.MaxProjectStoreDuration; } + if (TriggerParams.MaxBuildStoreDuration != std::chrono::seconds::max()) + { + MaxBuildStoreDuration = TriggerParams.MaxBuildStoreDuration; + } if (TriggerParams.DiskSizeSoftLimit != 0) { DiskSizeSoftLimit = TriggerParams.DiskSizeSoftLimit; @@ -2046,6 +2052,8 @@ GcScheduler::SchedulerThread() MaxCacheDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxCacheDuration; GcClock::TimePoint ProjectStoreExpireTime = MaxProjectStoreDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxProjectStoreDuration; + GcClock::TimePoint BuildStoreExpireTime = + MaxBuildStoreDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxBuildStoreDuration; const GcStorageSize TotalSize = m_GcManager.TotalStorageSize(); @@ -2102,6 +2110,10 @@ GcScheduler::SchedulerThread() { ProjectStoreExpireTime = SizeBasedExpireTime; } + if (SizeBasedExpireTime > BuildStoreExpireTime) + { + BuildStoreExpireTime = SizeBasedExpireTime; + } } std::chrono::seconds RemainingTimeUntilGc = @@ -2227,6 +2239,7 @@ GcScheduler::SchedulerThread() bool GcSuccess = CollectGarbage(CacheExpireTime, ProjectStoreExpireTime, + BuildStoreExpireTime, DoDelete, CollectSmallObjects, SkipCid, @@ -2333,6 +2346,7 @@ GcScheduler::ScrubStorage(bool DoDelete, bool SkipCid, std::chrono::seconds Time bool GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime, const GcClock::TimePoint& ProjectStoreExpireTime, + const GcClock::TimePoint& BuildStoreExpireTime, bool Delete, bool CollectSmallObjects, bool SkipCid, @@ -2416,6 +2430,7 @@ GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime, const GcSettings Settings = {.CacheExpireTime = CacheExpireTime, .ProjectStoreExpireTime = ProjectStoreExpireTime, + .BuildStoreExpireTime = BuildStoreExpireTime, .CollectSmallObjects = CollectSmallObjects, .IsDeleteMode = Delete, .SkipCidDelete = SkipCid, @@ -2447,6 +2462,7 @@ GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime, } SB.Append(fmt::format(" Cache cutoff time: {}\n", Settings.CacheExpireTime)); SB.Append(fmt::format(" Project store cutoff time: {}\n", Settings.ProjectStoreExpireTime)); + SB.Append(fmt::format(" Build store cutoff time: {}\n", Settings.BuildStoreExpireTime)); }; { @@ -2552,6 +2568,7 @@ GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime, if (Delete) { GcClock::TimePoint KeepRangeStart = Min(CacheExpireTime, ProjectStoreExpireTime); + KeepRangeStart = Min(KeepRangeStart, BuildStoreExpireTime); m_LastGcExpireTime = KeepRangeStart; std::unique_lock Lock(m_GcMutex); m_DiskUsageWindow.KeepRange(KeepRangeStart.time_since_epoch().count(), GcClock::Duration::max().count()); diff --git a/src/zenstore/include/zenstore/accesstime.h b/src/zenstore/include/zenstore/accesstime.h new file mode 100644 index 000000000..a28dc908b --- /dev/null +++ b/src/zenstore/include/zenstore/accesstime.h @@ -0,0 +1,47 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenstore/gc.h> + +#include <gsl/gsl-lite.hpp> + +namespace zen { + +// This store the access time as seconds since epoch internally in a 32-bit value giving is a range of 136 years since epoch +struct AccessTime +{ + explicit AccessTime(GcClock::Tick Tick) noexcept : SecondsSinceEpoch(ToSeconds(Tick)) {} + AccessTime& operator=(GcClock::Tick Tick) noexcept + { + SecondsSinceEpoch.store(ToSeconds(Tick), std::memory_order_relaxed); + return *this; + } + operator GcClock::Tick() const noexcept + { + return std::chrono::duration_cast<GcClock::Duration>(std::chrono::seconds(SecondsSinceEpoch.load(std::memory_order_relaxed))) + .count(); + } + + AccessTime(AccessTime&& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {} + AccessTime(const AccessTime& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {} + AccessTime& operator=(AccessTime&& Rhs) noexcept + { + SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed); + return *this; + } + AccessTime& operator=(const AccessTime& Rhs) noexcept + { + SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed); + return *this; + } + +private: + static uint32_t ToSeconds(GcClock::Tick Tick) + { + return gsl::narrow<uint32_t>(std::chrono::duration_cast<std::chrono::seconds>(GcClock::Duration(Tick)).count()); + } + std::atomic_uint32_t SecondsSinceEpoch; +}; + +} // namespace zen diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h index 97357e5cb..0c72a13aa 100644 --- a/src/zenstore/include/zenstore/blockstore.h +++ b/src/zenstore/include/zenstore/blockstore.h @@ -156,7 +156,7 @@ public: void WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, const WriteChunkCallback& Callback); typedef std::function<void(std::span<BlockStoreLocation> Locations)> WriteChunksCallback; - void WriteChunks(std::span<IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback); + void WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback); IoBuffer TryGetChunk(const BlockStoreLocation& Location) const; void Flush(bool ForceNewBlock); diff --git a/src/zenstore/include/zenstore/buildstore/buildstore.h b/src/zenstore/include/zenstore/buildstore/buildstore.h new file mode 100644 index 000000000..302af5f9c --- /dev/null +++ b/src/zenstore/include/zenstore/buildstore/buildstore.h @@ -0,0 +1,186 @@ + +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenstore/blockstore.h> + +#include <zencore/iohash.h> +#include <zenstore/accesstime.h> +#include <zenstore/caslog.h> +#include <zenstore/gc.h> +#include "../compactcas.h" +#include "../filecas.h" + +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +struct BuildStoreConfig +{ + std::filesystem::path RootDirectory; + uint32_t SmallBlobBlockStoreMaxBlockSize = 256 * 1024 * 1024; + uint64_t SmallBlobBlockStoreMaxBlockEmbedSize = 1 * 1024 * 1024; + uint32_t SmallBlobBlockStoreAlignement = 16; + uint32_t MetadataBlockStoreMaxBlockSize = 64 * 1024 * 1024; + uint32_t MetadataBlockStoreAlignement = 8; +}; + +class BuildStore : public GcReferencer, public GcReferenceLocker //, public GcStorage +{ +public: + explicit BuildStore(const BuildStoreConfig& Config, GcManager& Gc); + virtual ~BuildStore(); + + void PutBlob(const IoHash& BlobHashes, const IoBuffer& Payload); + IoBuffer GetBlob(const IoHash& BlobHashes); + + struct BlobExistsResult + { + bool HasBody = 0; + bool HasMetadata = 0; + }; + + std::vector<BlobExistsResult> BlobsExists(std::span<const IoHash> BlobHashes); + + void PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoBuffer> MetaDatas); + std::vector<IoBuffer> GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* OptionalWorkerPool); + + void Flush(); + +private: + void CompactState(); + + uint64_t ReadPayloadLog(const RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount); + uint64_t ReadMetadataLog(const RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount); + + //////// GcReferencer + virtual std::string GetGcName(GcCtx& Ctx) override; + virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override; + virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override; + virtual std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override; + + //////// GcReferenceLocker + virtual std::vector<RwLock::SharedLockScope> LockState(GcCtx& Ctx) override; + +#pragma pack(push) +#pragma pack(1) + struct PayloadEntry + { + static const uint8_t kTombStone = 0x10u; // Represents a deleted key/value + static const uint8_t kStandalone = 0x20u; // This payload is stored as a standalone value + + uint8_t Flags = 0; + uint8_t Reserved1 = 0; + uint8_t Reserved2 = 0; + uint8_t Reserved3 = 0; + }; + static_assert(sizeof(PayloadEntry) == 4); + + struct PayloadDiskEntry + { + PayloadEntry Entry; // 4 bytes + IoHash BlobHash; // 20 bytes + }; + static_assert(sizeof(PayloadDiskEntry) == 24); + + struct MetadataEntry + { + BlockStoreLocation Location; // 12 bytes + + ZenContentType ContentType = ZenContentType::kCOUNT; // 1 byte + static const uint8_t kTombStone = 0x10u; // Represents a deleted key/value + uint8_t Flags = 0; // 1 byte + + uint8_t Reserved1 = 0; + uint8_t Reserved2 = 0; + }; + static_assert(sizeof(MetadataEntry) == 16); + + struct MetadataDiskEntry + { + MetadataEntry Entry; // 16 bytes + IoHash BlobHash; // 20 bytes + uint8_t Reserved1 = 0; + uint8_t Reserved2 = 0; + uint8_t Reserved3 = 0; + uint8_t Reserved4 = 0; + }; + static_assert(sizeof(MetadataDiskEntry) == 40); + +#pragma pack(pop) + + static bool ValidatePayloadDiskEntry(const PayloadDiskEntry& Entry, std::string& OutReason); + static bool ValidateMetadataDiskEntry(const MetadataDiskEntry& Entry, std::string& OutReason); + + struct PayloadIndex + { + uint32_t Index = std::numeric_limits<uint32_t>::max(); + + operator bool() const { return Index != std::numeric_limits<uint32_t>::max(); }; + PayloadIndex() = default; + explicit PayloadIndex(size_t InIndex) : Index(uint32_t(InIndex)) {} + operator size_t() const { return Index; }; + inline auto operator<=>(const PayloadIndex& Other) const = default; + }; + + struct MetadataIndex + { + uint32_t Index = std::numeric_limits<uint32_t>::max(); + + operator bool() const { return Index != std::numeric_limits<uint32_t>::max(); }; + MetadataIndex() = default; + explicit MetadataIndex(size_t InIndex) : Index(uint32_t(InIndex)) {} + operator size_t() const { return Index; }; + inline auto operator<=>(const MetadataIndex& Other) const = default; + }; + + struct BlobIndex + { + uint32_t Index = std::numeric_limits<uint32_t>::max(); + + operator bool() const { return Index != std::numeric_limits<uint32_t>::max(); }; + BlobIndex() = default; + explicit BlobIndex(size_t InIndex) : Index(uint32_t(InIndex)) {} + operator size_t() const { return Index; }; + inline auto operator<=>(const BlobIndex& Other) const = default; + }; + + struct BlobEntry + { + PayloadIndex Payload; + MetadataIndex Metadata; + AccessTime LastAccessTime; + }; + static_assert(sizeof(BlobEntry) == 12); + + const BuildStoreConfig m_Config; + GcManager& m_Gc; + + RwLock m_Lock; + + std::vector<PayloadEntry> m_PayloadEntries; + std::vector<MetadataEntry> m_MetadataEntries; + + std::vector<BlobEntry> m_BlobEntries; + tsl::robin_map<IoHash, BlobIndex, IoHash::Hasher> m_BlobLookup; + + FileCasStrategy m_LargeBlobStore; + CasContainerStrategy m_SmallBlobStore; + BlockStore m_MetadataBlockStore; + + TCasLogFile<PayloadDiskEntry> m_PayloadlogFile; + TCasLogFile<MetadataDiskEntry> m_MetadatalogFile; + uint64_t m_BlobLogFlushPosition = 0; + uint64_t m_MetaLogFlushPosition = 0; + + std::unique_ptr<HashSet> m_TrackedCacheKeys; + + friend class BuildStoreGcReferenceChecker; + friend class BuildStoreGcReferencePruner; + friend class BuildStoreGcCompator; +}; + +void buildstore_forcelink(); + +} // namespace zen diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index 05400c784..5a51718d3 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -5,6 +5,7 @@ #include "cacheshared.h" #include <zencore/stats.h> +#include <zenstore/accesstime.h> #include <zenstore/blockstore.h> #include <zenstore/caslog.h> diff --git a/src/zenstore/include/zenstore/cache/cacheshared.h b/src/zenstore/include/zenstore/cache/cacheshared.h index 521c78bb1..ef1b803de 100644 --- a/src/zenstore/include/zenstore/cache/cacheshared.h +++ b/src/zenstore/include/zenstore/cache/cacheshared.h @@ -72,42 +72,4 @@ struct CacheContentStats bool IsKnownBadBucketName(std::string_view BucketName); bool ValidateIoBuffer(ZenContentType ContentType, IoBuffer Buffer); -////////////////////////////////////////////////////////////////////////// - -// This store the access time as seconds since epoch internally in a 32-bit value giving is a range of 136 years since epoch -struct AccessTime -{ - explicit AccessTime(GcClock::Tick Tick) noexcept : SecondsSinceEpoch(ToSeconds(Tick)) {} - AccessTime& operator=(GcClock::Tick Tick) noexcept - { - SecondsSinceEpoch.store(ToSeconds(Tick), std::memory_order_relaxed); - return *this; - } - operator GcClock::Tick() const noexcept - { - return std::chrono::duration_cast<GcClock::Duration>(std::chrono::seconds(SecondsSinceEpoch.load(std::memory_order_relaxed))) - .count(); - } - - AccessTime(AccessTime&& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {} - AccessTime(const AccessTime& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {} - AccessTime& operator=(AccessTime&& Rhs) noexcept - { - SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed); - return *this; - } - AccessTime& operator=(const AccessTime& Rhs) noexcept - { - SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed); - return *this; - } - -private: - static uint32_t ToSeconds(GcClock::Tick Tick) - { - return gsl::narrow<uint32_t>(std::chrono::duration_cast<std::chrono::seconds>(GcClock::Duration(Tick)).count()); - } - std::atomic_uint32_t SecondsSinceEpoch; -}; - } // namespace zen diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h index 3daae0a93..67aadef71 100644 --- a/src/zenstore/include/zenstore/gc.h +++ b/src/zenstore/include/zenstore/gc.h @@ -55,6 +55,7 @@ struct GcSettings { GcClock::TimePoint CacheExpireTime = GcClock::Now(); GcClock::TimePoint ProjectStoreExpireTime = GcClock::Now(); + GcClock::TimePoint BuildStoreExpireTime = GcClock::Now(); bool CollectSmallObjects = false; bool IsDeleteMode = false; bool SkipCidDelete = false; @@ -412,6 +413,7 @@ struct GcSchedulerConfig std::chrono::seconds Interval{}; std::chrono::seconds MaxCacheDuration{86400}; std::chrono::seconds MaxProjectStoreDuration{604800}; + std::chrono::seconds MaxBuildStoreDuration{604800}; bool CollectSmallObjects = true; bool Enabled = true; uint64_t DiskReserveSize = 1ul << 28; @@ -496,6 +498,7 @@ public: bool CollectSmallObjects = false; std::chrono::seconds MaxCacheDuration = std::chrono::seconds::max(); std::chrono::seconds MaxProjectStoreDuration = std::chrono::seconds::max(); + std::chrono::seconds MaxBuildStoreDuration = std::chrono::seconds::max(); uint64_t DiskSizeSoftLimit = 0; bool SkipCid = false; bool SkipDelete = false; @@ -528,6 +531,7 @@ private: void SchedulerThread(); bool CollectGarbage(const GcClock::TimePoint& CacheExpireTime, const GcClock::TimePoint& ProjectStoreExpireTime, + const GcClock::TimePoint& BuildStoreExpireTime, bool Delete, bool CollectSmallObjects, bool SkipCid, diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp new file mode 100644 index 000000000..c95215889 --- /dev/null +++ b/src/zenutil/buildstoragecache.cpp @@ -0,0 +1,362 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/buildstoragecache.h> + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/fmtutils.h> +#include <zencore/scopeguard.h> +#include <zencore/timer.h> +#include <zencore/trace.h> +#include <zencore/workthreadpool.h> +#include <zenhttp/httpclient.h> +#include <zenhttp/packageformat.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_set.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +using namespace std::literals; + +class ZenBuildStorageCache : public BuildStorageCache +{ +public: + explicit ZenBuildStorageCache(HttpClient& HttpClient, + BuildStorageCache::Statistics& Stats, + std::string_view Namespace, + std::string_view Bucket, + const std::filesystem::path& TempFolderPath) + : m_HttpClient(HttpClient) + , m_Stats(Stats) + , m_Namespace(Namespace.empty() ? "none" : Namespace) + , m_Bucket(Bucket.empty() ? "none" : Bucket) + , m_TempFolderPath(std::filesystem::path(TempFolderPath).make_preferred()) + , m_BackgroundWorkPool(1) + , m_PendingBackgroundWorkCount(1) + , m_CancelBackgroundWork(false) + { + } + + virtual ~ZenBuildStorageCache() + { + try + { + m_CancelBackgroundWork.store(true); + m_PendingBackgroundWorkCount.CountDown(); + m_PendingBackgroundWorkCount.Wait(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~ZenBuildStorageCache() failed with: {}", Ex.what()); + } + } + + void ScheduleBackgroundWork(std::function<void()>&& Work) + { + m_PendingBackgroundWorkCount.AddCount(1); + try + { + m_BackgroundWorkPool.ScheduleWork([this, Work = std::move(Work)]() { + ZEN_TRACE_CPU("ZenBuildStorageCache::BackgroundWork"); + auto _ = MakeGuard([this]() { m_PendingBackgroundWorkCount.CountDown(); }); + if (!m_CancelBackgroundWork) + { + try + { + Work(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed executing background upload to build cache. Reason: {}", Ex.what()); + } + } + }); + } + catch (const std::exception& Ex) + { + m_PendingBackgroundWorkCount.CountDown(); + ZEN_ERROR("Failed scheduling background upload to build cache. Reason: {}", Ex.what()); + } + } + + virtual void PutBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + ZenContentType ContentType, + const CompositeBuffer& Payload) override + { + ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); + ScheduleBackgroundWork( + [this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = CompositeBuffer(Payload)]() { + ZEN_TRACE_CPU("ZenBuildStorageCache::PutBuildBlob"); + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + HttpClient::Response CacheResponse = + m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()), + Payload, + ContentType); + AddStatistic(CacheResponse); + if (!CacheResponse.IsSuccess()) + { + ZEN_DEBUG("Failed posting blob to cache: {}", CacheResponse.ErrorMessage(""sv)); + } + }); + } + + virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override + { + ZEN_TRACE_CPU("ZenBuildStorageCache::GetBuildBlob"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + HttpClient::KeyValueMap Headers; + if (RangeOffset != 0 || RangeBytes != (uint64_t)-1) + { + Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", RangeOffset, RangeOffset + RangeBytes - 1)}); + } + CreateDirectories(m_TempFolderPath); + HttpClient::Response CacheResponse = + m_HttpClient.Download(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()), + m_TempFolderPath, + Headers); + AddStatistic(CacheResponse); + if (CacheResponse.IsSuccess()) + { + return CacheResponse.ResponsePayload; + } + return {}; + } + + virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) override + { + ScheduleBackgroundWork([this, + BuildId = Oid(BuildId), + BlobRawHashes = std::vector<IoHash>(BlobHashes.begin(), BlobHashes.end()), + MetaDatas = std::vector<CbObject>(MetaDatas.begin(), MetaDatas.end())]() { + ZEN_TRACE_CPU("ZenBuildStorageCache::PutBlobMetadatas"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + const uint64_t BlobCount = BlobRawHashes.size(); + + CbPackage RequestPackage; + std::vector<CbAttachment> Attachments; + tsl::robin_set<IoHash, IoHash::Hasher> AttachmentHashes; + Attachments.reserve(BlobCount); + AttachmentHashes.reserve(BlobCount); + { + CbObjectWriter RequestWriter; + RequestWriter.BeginArray("blobHashes"); + for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++) + { + RequestWriter.AddHash(BlobRawHashes[BlockHashIndex]); + } + RequestWriter.EndArray(); // blobHashes + + RequestWriter.BeginArray("metadatas"); + for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++) + { + const IoHash ObjectHash = MetaDatas[BlockHashIndex].GetHash(); + RequestWriter.AddBinaryAttachment(ObjectHash); + if (!AttachmentHashes.contains(ObjectHash)) + { + Attachments.push_back(CbAttachment(MetaDatas[BlockHashIndex], ObjectHash)); + AttachmentHashes.insert(ObjectHash); + } + } + + RequestWriter.EndArray(); // metadatas + + RequestPackage.SetObject(RequestWriter.Save()); + } + RequestPackage.AddAttachments(Attachments); + + CompositeBuffer RpcRequestBuffer = FormatPackageMessageBuffer(RequestPackage); + + HttpClient::Response CacheResponse = + m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/putBlobMetadata", m_Namespace, m_Bucket, BuildId), + RpcRequestBuffer, + ZenContentType::kCbPackage); + AddStatistic(CacheResponse); + if (!CacheResponse.IsSuccess()) + { + ZEN_DEBUG("Failed posting blob metadata to cache: {}", CacheResponse.ErrorMessage(""sv)); + } + }); + } + + virtual std::vector<CbObject> GetBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes) override + { + ZEN_TRACE_CPU("ZenBuildStorageCache::GetBlobMetadatas"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + CbObjectWriter Request; + + Request.BeginArray("blobHashes"sv); + for (const IoHash& BlobHash : BlobHashes) + { + Request.AddHash(BlobHash); + } + Request.EndArray(); + + IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCbObject); + + HttpClient::Response Response = + m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/getBlobMetadata", m_Namespace, m_Bucket, BuildId), + Payload, + HttpClient::Accept(ZenContentType::kCbObject)); + AddStatistic(Response); + if (Response.IsSuccess()) + { + std::vector<CbObject> Result; + + CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload); + CbObject ResponseObject = ResponsePackage.GetObject(); + + CbArrayView BlobHashArray = ResponseObject["blobHashes"sv].AsArrayView(); + CbArrayView MetadatasArray = ResponseObject["metadatas"sv].AsArrayView(); + Result.reserve(MetadatasArray.Num()); + auto BlobHashesIt = BlobHashes.begin(); + auto BlobHashArrayIt = begin(BlobHashArray); + auto MetadataArrayIt = begin(MetadatasArray); + while (MetadataArrayIt != end(MetadatasArray)) + { + const IoHash BlobHash = (*BlobHashArrayIt).AsHash(); + while (BlobHash != *BlobHashesIt) + { + ZEN_ASSERT(BlobHashesIt != BlobHashes.end()); + BlobHashesIt++; + } + + ZEN_ASSERT(BlobHash == *BlobHashesIt); + + const IoHash MetaHash = (*MetadataArrayIt).AsAttachment(); + const CbAttachment* MetaAttachment = ResponsePackage.FindAttachment(MetaHash); + ZEN_ASSERT(MetaAttachment); + + CbObject Metadata = MetaAttachment->AsObject(); + Result.emplace_back(std::move(Metadata)); + + BlobHashArrayIt++; + MetadataArrayIt++; + BlobHashesIt++; + } + return Result; + } + return {}; + } + + virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) override + { + ZEN_TRACE_CPU("ZenBuildStorageCache::BlobsExists"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + CbObjectWriter Request; + + Request.BeginArray("blobHashes"sv); + for (const IoHash& BlobHash : BlobHashes) + { + Request.AddHash(BlobHash); + } + Request.EndArray(); + + IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCbObject); + + HttpClient::Response Response = m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/exists", m_Namespace, m_Bucket, BuildId), + Payload, + HttpClient::Accept(ZenContentType::kCbObject)); + AddStatistic(Response); + if (Response.IsSuccess()) + { + CbObject ResponseObject = LoadCompactBinaryObject(Response.ResponsePayload); + if (!ResponseObject) + { + throw std::runtime_error("BlobExists reponse is invalid, failed to load payload as compact binary object"); + } + CbArrayView BlobsExistsArray = ResponseObject["blobExists"sv].AsArrayView(); + if (!BlobsExistsArray) + { + throw std::runtime_error("BlobExists reponse is invalid, 'blobExists' array is missing"); + } + if (BlobsExistsArray.Num() != BlobHashes.size()) + { + throw std::runtime_error(fmt::format("BlobExists reponse is invalid, 'blobExists' array contains {} entries, expected {}", + BlobsExistsArray.Num(), + BlobHashes.size())); + } + + CbArrayView MetadatasExistsArray = ResponseObject["metadataExists"sv].AsArrayView(); + if (!MetadatasExistsArray) + { + throw std::runtime_error("BlobExists reponse is invalid, 'metadataExists' array is missing"); + } + if (MetadatasExistsArray.Num() != BlobHashes.size()) + { + throw std::runtime_error( + fmt::format("BlobExists reponse is invalid, 'metadataExists' array contains {} entries, expected {}", + MetadatasExistsArray.Num(), + BlobHashes.size())); + } + + std::vector<BlobExistsResult> Result; + Result.reserve(BlobHashes.size()); + auto BlobExistsIt = begin(BlobsExistsArray); + auto MetadataExistsIt = begin(MetadatasExistsArray); + while (BlobExistsIt != end(BlobsExistsArray)) + { + ZEN_ASSERT(MetadataExistsIt != end(MetadatasExistsArray)); + + const bool HasBody = (*BlobExistsIt).AsBool(); + const bool HasMetadata = (*MetadataExistsIt).AsBool(); + + Result.push_back({.HasBody = HasBody, .HasMetadata = HasMetadata}); + + BlobExistsIt++; + MetadataExistsIt++; + } + return Result; + } + return {}; + } + +private: + void AddStatistic(const HttpClient::Response& Result) + { + m_Stats.TotalBytesWritten += Result.UploadedBytes; + m_Stats.TotalBytesRead += Result.DownloadedBytes; + m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0); + m_Stats.TotalRequestCount++; + } + + HttpClient& m_HttpClient; + BuildStorageCache::Statistics& m_Stats; + const std::string m_Namespace; + const std::string m_Bucket; + const std::filesystem::path m_TempFolderPath; + + WorkerThreadPool m_BackgroundWorkPool; + Latch m_PendingBackgroundWorkCount; + std::atomic<bool> m_CancelBackgroundWork; +}; + +std::unique_ptr<BuildStorageCache> +CreateZenBuildStorageCache(HttpClient& HttpClient, + BuildStorageCache::Statistics& Stats, + std::string_view Namespace, + std::string_view Bucket, + const std::filesystem::path& TempFolderPath) +{ + return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath); +} + +} // namespace zen diff --git a/src/zenutil/chunkblock.cpp b/src/zenutil/chunkblock.cpp index f3c14edc4..abfc0fb63 100644 --- a/src/zenutil/chunkblock.cpp +++ b/src/zenutil/chunkblock.cpp @@ -52,7 +52,7 @@ ParseChunkBlockDescriptionList(const CbObjectView& BlocksObject) return {}; } std::vector<ChunkBlockDescription> Result; - CbArrayView Blocks = BlocksObject["blocks"].AsArrayView(); + CbArrayView Blocks = BlocksObject["blocks"sv].AsArrayView(); Result.reserve(Blocks.Num()); for (CbFieldView BlockView : Blocks) { diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp index 130fec355..f040e9ece 100644 --- a/src/zenutil/filebuildstorage.cpp +++ b/src/zenutil/filebuildstorage.cpp @@ -442,18 +442,19 @@ public: SimulateLatency(0, 0); } - virtual std::vector<ChunkBlockDescription> FindBlocks(const Oid& BuildId) override + virtual CbObject FindBlocks(const Oid& BuildId) override { ZEN_TRACE_CPU("FileBuildStorage::FindBlocks"); ZEN_UNUSED(BuildId); - SimulateLatency(0, 0); + SimulateLatency(sizeof(BuildId), 0); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); m_Stats.TotalRequestCount++; DirectoryContent Content; GetDirectoryContent(GetBlobsMetadataFolder(), DirectoryContentFlags::IncludeFiles, Content); - std::vector<ChunkBlockDescription> Result; + CbObjectWriter Writer; + Writer.BeginArray("blocks"); for (const std::filesystem::path& MetaDataFile : Content.Files) { IoHash ChunkHash; @@ -467,24 +468,28 @@ public: m_Stats.TotalBytesRead += BlockMetaDataPayload.GetSize(); CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload)); - Result.emplace_back(ParseChunkBlockDescription(BlockObject)); + Writer.AddObject(BlockObject); } } } - SimulateLatency(0, sizeof(IoHash) * Result.size()); + Writer.EndArray(); // blocks + CbObject Result = Writer.Save(); + SimulateLatency(0, Result.GetSize()); return Result; } - virtual std::vector<ChunkBlockDescription> GetBlockMetadata(const Oid& BuildId, std::span<const IoHash> BlockHashes) override + virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override { ZEN_TRACE_CPU("FileBuildStorage::GetBlockMetadata"); ZEN_UNUSED(BuildId); - SimulateLatency(0, 0); + SimulateLatency(sizeof(Oid) + sizeof(IoHash) * BlockHashes.size(), 0); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); m_Stats.TotalRequestCount++; - std::vector<ChunkBlockDescription> Result; + CbObjectWriter Writer; + Writer.BeginArray("blocks"); + for (const IoHash& BlockHash : BlockHashes) { std::filesystem::path MetaDataFile = GetBlobMetadataPath(BlockHash); @@ -495,10 +500,12 @@ public: m_Stats.TotalBytesRead += BlockMetaDataPayload.GetSize(); CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload)); - Result.emplace_back(ParseChunkBlockDescription(BlockObject)); + Writer.AddObject(BlockObject); } } - SimulateLatency(sizeof(BlockHashes) * BlockHashes.size(), sizeof(ChunkBlockDescription) * Result.size()); + Writer.EndArray(); // blocks + CbObject Result = Writer.Save(); + SimulateLatency(0, Result.GetSize()); return Result; } diff --git a/src/zenutil/include/zenutil/buildstorage.h b/src/zenutil/include/zenutil/buildstorage.h index 2ebd65a00..f8c7c012c 100644 --- a/src/zenutil/include/zenutil/buildstorage.h +++ b/src/zenutil/include/zenutil/buildstorage.h @@ -54,9 +54,9 @@ public: uint64_t ChunkSize, std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver) = 0; - virtual void PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) = 0; - virtual std::vector<ChunkBlockDescription> FindBlocks(const Oid& BuildId) = 0; - virtual std::vector<ChunkBlockDescription> GetBlockMetadata(const Oid& BuildId, std::span<const IoHash> BlockHashes) = 0; + virtual void PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) = 0; + virtual CbObject FindBlocks(const Oid& BuildId) = 0; + virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) = 0; virtual void PutBuildPartStats(const Oid& BuildId, const Oid& BuildPartId, const tsl::robin_map<std::string, double>& FloatStats) = 0; }; diff --git a/src/zenutil/include/zenutil/buildstoragecache.h b/src/zenutil/include/zenutil/buildstoragecache.h new file mode 100644 index 000000000..08c936bf5 --- /dev/null +++ b/src/zenutil/include/zenutil/buildstoragecache.h @@ -0,0 +1,52 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/logging.h> + +#include <zencore/compactbinary.h> +#include <zencore/compositebuffer.h> +#include <zenutil/chunkblock.h> + +namespace zen { + +class HttpClient; + +class BuildStorageCache +{ +public: + struct Statistics + { + std::atomic<uint64_t> TotalBytesRead = 0; + std::atomic<uint64_t> TotalBytesWritten = 0; + std::atomic<uint64_t> TotalRequestCount = 0; + std::atomic<uint64_t> TotalRequestTimeUs = 0; + std::atomic<uint64_t> TotalExecutionTimeUs = 0; + }; + + virtual ~BuildStorageCache() {} + + virtual void PutBuildBlob(const Oid& BuildId, const IoHash& RawHash, ZenContentType ContentType, const CompositeBuffer& Payload) = 0; + virtual IoBuffer GetBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + uint64_t RangeOffset = 0, + uint64_t RangeBytes = (uint64_t)-1) = 0; + + virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) = 0; + virtual std::vector<CbObject> GetBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0; + + struct BlobExistsResult + { + bool HasBody = 0; + bool HasMetadata = 0; + }; + + virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0; +}; + +std::unique_ptr<BuildStorageCache> CreateZenBuildStorageCache(HttpClient& HttpClient, + BuildStorageCache::Statistics& Stats, + std::string_view Namespace, + std::string_view Bucket, + const std::filesystem::path& TempFolderPath); +} // namespace zen diff --git a/src/zenutil/include/zenutil/logging/rotatingfilesink.h b/src/zenutil/include/zenutil/logging/rotatingfilesink.h index 758722156..cd28bdcb2 100644 --- a/src/zenutil/include/zenutil/logging/rotatingfilesink.h +++ b/src/zenutil/include/zenutil/logging/rotatingfilesink.h @@ -27,7 +27,6 @@ public: { ZEN_MEMSCOPE(ELLMTag::Logging); - ZEN_MEMSCOPE(ELLMTag::Logging); std::error_code Ec; if (RotateOnOpen) { diff --git a/src/zenutil/jupiter/jupiterbuildstorage.cpp b/src/zenutil/jupiter/jupiterbuildstorage.cpp index d70fd8c00..b6d9e3990 100644 --- a/src/zenutil/jupiter/jupiterbuildstorage.cpp +++ b/src/zenutil/jupiter/jupiterbuildstorage.cpp @@ -49,7 +49,7 @@ public: { throw std::runtime_error(fmt::format("Failed listing builds: {} ({})", ListResult.Reason, ListResult.ErrorCode)); } - return PayloadToJson("Failed listing builds"sv, ListResult.Response); + return PayloadToCbObject("Failed listing builds"sv, ListResult.Response); } virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override @@ -66,7 +66,7 @@ public: { throw std::runtime_error(fmt::format("Failed creating build: {} ({})", PutResult.Reason, PutResult.ErrorCode)); } - return PayloadToJson(fmt::format("Failed creating build: {}", BuildId), PutResult.Response); + return PayloadToCbObject(fmt::format("Failed creating build: {}", BuildId), PutResult.Response); } virtual CbObject GetBuild(const Oid& BuildId) override @@ -81,7 +81,7 @@ public: { throw std::runtime_error(fmt::format("Failed fetching build: {} ({})", GetBuildResult.Reason, GetBuildResult.ErrorCode)); } - return PayloadToJson(fmt::format("Failed fetching build {}:", BuildId), GetBuildResult.Response); + return PayloadToCbObject(fmt::format("Failed fetching build {}:", BuildId), GetBuildResult.Response); } virtual void FinalizeBuild(const Oid& BuildId) override @@ -134,7 +134,7 @@ public: GetBuildPartResult.Reason, GetBuildPartResult.ErrorCode)); } - return PayloadToJson(fmt::format("Failed fetching build part {}:", BuildPartId), GetBuildPartResult.Response); + return PayloadToCbObject(fmt::format("Failed fetching build part {}:", BuildPartId), GetBuildPartResult.Response); } virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override @@ -289,7 +289,7 @@ public: } } - virtual std::vector<ChunkBlockDescription> FindBlocks(const Oid& BuildId) override + virtual CbObject FindBlocks(const Oid& BuildId) override { ZEN_TRACE_CPU("Jupiter::FindBlocks"); @@ -301,10 +301,10 @@ public: { throw std::runtime_error(fmt::format("Failed fetching known blocks: {} ({})", FindResult.Reason, FindResult.ErrorCode)); } - return ParseChunkBlockDescriptionList(PayloadToJson("Failed fetching known blocks"sv, FindResult.Response)); + return PayloadToCbObject("Failed fetching known blocks"sv, FindResult.Response); } - virtual std::vector<ChunkBlockDescription> GetBlockMetadata(const Oid& BuildId, std::span<const IoHash> BlockHashes) override + virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override { ZEN_TRACE_CPU("Jupiter::GetBlockMetadata"); @@ -328,24 +328,7 @@ public: throw std::runtime_error( fmt::format("Failed fetching block metadatas: {} ({})", GetBlockMetadataResult.Reason, GetBlockMetadataResult.ErrorCode)); } - std::vector<ChunkBlockDescription> UnorderedList = - ParseChunkBlockDescriptionList(PayloadToJson("Failed fetching block metadatas", GetBlockMetadataResult.Response)); - tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockDescriptionLookup; - for (size_t DescriptionIndex = 0; DescriptionIndex < UnorderedList.size(); DescriptionIndex++) - { - const ChunkBlockDescription& Description = UnorderedList[DescriptionIndex]; - BlockDescriptionLookup.insert_or_assign(Description.BlockHash, DescriptionIndex); - } - std::vector<ChunkBlockDescription> SortedBlockDescriptions; - SortedBlockDescriptions.reserve(BlockDescriptionLookup.size()); - for (const IoHash& BlockHash : BlockHashes) - { - if (auto It = BlockDescriptionLookup.find(BlockHash); It != BlockDescriptionLookup.end()) - { - SortedBlockDescriptions.push_back(std::move(UnorderedList[It->second])); - } - } - return SortedBlockDescriptions; + return PayloadToCbObject("Failed fetching block metadatas", GetBlockMetadataResult.Response); } virtual void PutBuildPartStats(const Oid& BuildId, @@ -373,7 +356,7 @@ public: } private: - static CbObject PayloadToJson(std::string_view Context, const IoBuffer& Payload) + static CbObject PayloadToCbObject(std::string_view Context, const IoBuffer& Payload) { if (Payload.GetContentType() == ZenContentType::kJSON) { |