diff options
| author | Dan Engelbrecht <[email protected]> | 2025-11-06 19:15:41 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-11-06 19:15:41 +0100 |
| commit | b919cda2e0c065ba556912e9ab404a321362f8ab (patch) | |
| tree | 8192317ebb34469271d58644e942deb9c858d05f /src/zenremotestore/builds/buildstorageutil.cpp | |
| parent | merge 5.7.8 minimal release (#635) (diff) | |
| download | zen-b919cda2e0c065ba556912e9ab404a321362f8ab.tar.xz zen-b919cda2e0c065ba556912e9ab404a321362f8ab.zip | |
remotestore op refactorings (#637)
* broke out BuildLogOutput to separate file
* refactored out GetBlockDescriptions
* log progress improvements
* refactorings to accomodate oplog download operations
Diffstat (limited to 'src/zenremotestore/builds/buildstorageutil.cpp')
| -rw-r--r-- | src/zenremotestore/builds/buildstorageutil.cpp | 210 |
1 files changed, 210 insertions, 0 deletions
diff --git a/src/zenremotestore/builds/buildstorageutil.cpp b/src/zenremotestore/builds/buildstorageutil.cpp index 998529714..3680d1d83 100644 --- a/src/zenremotestore/builds/buildstorageutil.cpp +++ b/src/zenremotestore/builds/buildstorageutil.cpp @@ -2,9 +2,14 @@ #include <zenremotestore/builds/buildstorageutil.h> +#include <zencore/fmtutils.h> +#include <zencore/timer.h> +#include <zenremotestore/builds/buildstorage.h> #include <zenremotestore/builds/buildstoragecache.h> #include <zenremotestore/builds/jupiterbuildstorage.h> +#include <zenremotestore/chunking/chunkblock.h> #include <zenremotestore/jupiter/jupiterhost.h> +#include <zenremotestore/operationlogoutput.h> #include <zenutil/zenserverprocess.h> namespace zen { @@ -154,4 +159,209 @@ ResolveBuildStorage(const HttpClientSettings& ClientSettings, .CacheName = CacheName, .CacheAssumeHttp2 = CacheAssumeHttp2}; } + +std::vector<ChunkBlockDescription> +GetBlockDescriptions(OperationLogOutput& Output, + BuildStorageBase& Storage, + BuildStorageCache* OptionalCacheStorage, + const Oid& BuildId, + const Oid& BuildPartId, + std::span<const IoHash> BlockRawHashes, + bool AttemptFallback, + bool IsQuiet, + bool IsVerbose) +{ + using namespace std::literals; + + if (!IsQuiet) + { + ZEN_CONSOLE("Fetching metadata for {} blocks", BlockRawHashes.size()); + } + + Stopwatch GetBlockMetadataTimer; + + std::vector<ChunkBlockDescription> UnorderedList; + tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockDescriptionLookup; + if (OptionalCacheStorage && !BlockRawHashes.empty()) + { + std::vector<CbObject> CacheBlockMetadatas = OptionalCacheStorage->GetBlobMetadatas(BuildId, BlockRawHashes); + UnorderedList.reserve(CacheBlockMetadatas.size()); + for (size_t CacheBlockMetadataIndex = 0; CacheBlockMetadataIndex < CacheBlockMetadatas.size(); CacheBlockMetadataIndex++) + { + const CbObject& CacheBlockMetadata = CacheBlockMetadatas[CacheBlockMetadataIndex]; + ChunkBlockDescription Description = ParseChunkBlockDescription(CacheBlockMetadata); + if (Description.BlockHash == IoHash::Zero) + { + ZEN_OPERATION_LOG_WARN(Output, "Unexpected/invalid block metadata received from remote cache, skipping block"); + } + else + { + UnorderedList.emplace_back(std::move(Description)); + } + } + for (size_t DescriptionIndex = 0; DescriptionIndex < UnorderedList.size(); DescriptionIndex++) + { + const ChunkBlockDescription& Description = UnorderedList[DescriptionIndex]; + BlockDescriptionLookup.insert_or_assign(Description.BlockHash, DescriptionIndex); + } + } + + if (UnorderedList.size() < BlockRawHashes.size()) + { + std::vector<IoHash> RemainingBlockHashes; + RemainingBlockHashes.reserve(BlockRawHashes.size() - UnorderedList.size()); + for (const IoHash& BlockRawHash : BlockRawHashes) + { + if (!BlockDescriptionLookup.contains(BlockRawHash)) + { + RemainingBlockHashes.push_back(BlockRawHash); + } + } + CbObject BlockMetadatas = Storage.GetBlockMetadatas(BuildId, RemainingBlockHashes); + std::vector<ChunkBlockDescription> RemainingList; + { + CbArrayView BlocksArray = BlockMetadatas["blocks"sv].AsArrayView(); + std::vector<IoHash> FoundBlockHashes; + std::vector<CbObject> FoundBlockMetadatas; + for (CbFieldView Block : BlocksArray) + { + ChunkBlockDescription Description = ParseChunkBlockDescription(Block.AsObjectView()); + + if (Description.BlockHash == IoHash::Zero) + { + ZEN_OPERATION_LOG_WARN(Output, "Unexpected/invalid block metadata received from remote store, skipping block"); + } + else + { + if (OptionalCacheStorage) + { + UniqueBuffer MetaBuffer = UniqueBuffer::Alloc(Block.GetSize()); + Block.CopyTo(MetaBuffer.GetMutableView()); + CbObject BlockMetadata(MetaBuffer.MoveToShared()); + + FoundBlockHashes.push_back(Description.BlockHash); + FoundBlockMetadatas.push_back(BlockMetadata); + } + RemainingList.emplace_back(std::move(Description)); + } + } + if (OptionalCacheStorage && !FoundBlockHashes.empty()) + { + OptionalCacheStorage->PutBlobMetadatas(BuildId, FoundBlockHashes, FoundBlockMetadatas); + } + } + + for (size_t DescriptionIndex = 0; DescriptionIndex < RemainingList.size(); DescriptionIndex++) + { + const ChunkBlockDescription& Description = RemainingList[DescriptionIndex]; + BlockDescriptionLookup.insert_or_assign(Description.BlockHash, UnorderedList.size() + DescriptionIndex); + } + UnorderedList.insert(UnorderedList.end(), RemainingList.begin(), RemainingList.end()); + } + + std::vector<ChunkBlockDescription> Result; + Result.reserve(BlockDescriptionLookup.size()); + for (const IoHash& BlockHash : BlockRawHashes) + { + if (auto It = BlockDescriptionLookup.find(BlockHash); It != BlockDescriptionLookup.end()) + { + Result.push_back(std::move(UnorderedList[It->second])); + } + } + + if (!IsQuiet) + { + ZEN_OPERATION_LOG_INFO(Output, + "GetBlockMetadata for {} took {}. Found {} blocks", + BuildPartId, + NiceTimeSpanMs(GetBlockMetadataTimer.GetElapsedTimeMs()), + Result.size()); + } + + if (Result.size() != BlockRawHashes.size()) + { + std::string ErrorDescription = + fmt::format("All required blocks could not be found, {} blocks does not have metadata in this context.", + BlockRawHashes.size() - Result.size()); + if (IsVerbose) + { + for (const IoHash& BlockHash : BlockRawHashes) + { + if (auto It = + std::find_if(Result.begin(), + Result.end(), + [BlockHash](const ChunkBlockDescription& Description) { return Description.BlockHash == BlockHash; }); + It == Result.end()) + { + ErrorDescription += fmt::format("\n {}", BlockHash); + } + } + } + if (AttemptFallback) + { + ZEN_OPERATION_LOG_WARN(Output, "{} Attemping fallback options.", ErrorDescription); + std::vector<ChunkBlockDescription> AugmentedBlockDescriptions; + AugmentedBlockDescriptions.reserve(BlockRawHashes.size()); + std::vector<ChunkBlockDescription> FoundBlocks = ParseChunkBlockDescriptionList(Storage.FindBlocks(BuildId, (uint64_t)-1)); + + for (const IoHash& BlockHash : BlockRawHashes) + { + if (auto It = + std::find_if(Result.begin(), + Result.end(), + [BlockHash](const ChunkBlockDescription& Description) { return Description.BlockHash == BlockHash; }); + It != Result.end()) + { + AugmentedBlockDescriptions.emplace_back(std::move(*It)); + } + else if (auto ListBlocksIt = std::find_if( + FoundBlocks.begin(), + FoundBlocks.end(), + [BlockHash](const ChunkBlockDescription& Description) { return Description.BlockHash == BlockHash; }); + ListBlocksIt != FoundBlocks.end()) + { + ZEN_OPERATION_LOG_INFO(Output, "Found block {} via context find successfully", BlockHash); + AugmentedBlockDescriptions.emplace_back(std::move(*ListBlocksIt)); + } + else + { + IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockHash); + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Block {} could not be found", BlockHash)); + } + IoHash BlockRawHash; + uint64_t BlockRawSize; + CompressedBuffer CompressedBlockBuffer = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(BlockBuffer)), BlockRawHash, BlockRawSize); + if (!CompressedBlockBuffer) + { + throw std::runtime_error(fmt::format("Block {} is not a compressed buffer", BlockHash)); + } + + if (BlockRawHash != BlockHash) + { + throw std::runtime_error(fmt::format("Block {} header has a mismatching raw hash {}", BlockHash, BlockRawHash)); + } + + CompositeBuffer DecompressedBlockBuffer = CompressedBlockBuffer.DecompressToComposite(); + if (!DecompressedBlockBuffer) + { + throw std::runtime_error(fmt::format("Block {} failed to decompress", BlockHash)); + } + + ChunkBlockDescription MissingChunkDescription = GetChunkBlockDescription(DecompressedBlockBuffer.Flatten(), BlockHash); + AugmentedBlockDescriptions.emplace_back(std::move(MissingChunkDescription)); + } + } + Result.swap(AugmentedBlockDescriptions); + } + else + { + throw std::runtime_error(ErrorDescription); + } + } + return Result; +} + } // namespace zen |