aboutsummaryrefslogtreecommitdiff
path: root/src/zenremotestore/builds/buildstorageutil.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-11-06 19:15:41 +0100
committerGitHub Enterprise <[email protected]>2025-11-06 19:15:41 +0100
commitb919cda2e0c065ba556912e9ab404a321362f8ab (patch)
tree8192317ebb34469271d58644e942deb9c858d05f /src/zenremotestore/builds/buildstorageutil.cpp
parentmerge 5.7.8 minimal release (#635) (diff)
downloadzen-b919cda2e0c065ba556912e9ab404a321362f8ab.tar.xz
zen-b919cda2e0c065ba556912e9ab404a321362f8ab.zip
remotestore op refactorings (#637)
* broke out BuildLogOutput to separate file * refactored out GetBlockDescriptions * log progress improvements * refactorings to accomodate oplog download operations
Diffstat (limited to 'src/zenremotestore/builds/buildstorageutil.cpp')
-rw-r--r--src/zenremotestore/builds/buildstorageutil.cpp210
1 files changed, 210 insertions, 0 deletions
diff --git a/src/zenremotestore/builds/buildstorageutil.cpp b/src/zenremotestore/builds/buildstorageutil.cpp
index 998529714..3680d1d83 100644
--- a/src/zenremotestore/builds/buildstorageutil.cpp
+++ b/src/zenremotestore/builds/buildstorageutil.cpp
@@ -2,9 +2,14 @@
#include <zenremotestore/builds/buildstorageutil.h>
+#include <zencore/fmtutils.h>
+#include <zencore/timer.h>
+#include <zenremotestore/builds/buildstorage.h>
#include <zenremotestore/builds/buildstoragecache.h>
#include <zenremotestore/builds/jupiterbuildstorage.h>
+#include <zenremotestore/chunking/chunkblock.h>
#include <zenremotestore/jupiter/jupiterhost.h>
+#include <zenremotestore/operationlogoutput.h>
#include <zenutil/zenserverprocess.h>
namespace zen {
@@ -154,4 +159,209 @@ ResolveBuildStorage(const HttpClientSettings& ClientSettings,
.CacheName = CacheName,
.CacheAssumeHttp2 = CacheAssumeHttp2};
}
+
+std::vector<ChunkBlockDescription>
+GetBlockDescriptions(OperationLogOutput& Output,
+ BuildStorageBase& Storage,
+ BuildStorageCache* OptionalCacheStorage,
+ const Oid& BuildId,
+ const Oid& BuildPartId,
+ std::span<const IoHash> BlockRawHashes,
+ bool AttemptFallback,
+ bool IsQuiet,
+ bool IsVerbose)
+{
+ using namespace std::literals;
+
+ if (!IsQuiet)
+ {
+ ZEN_CONSOLE("Fetching metadata for {} blocks", BlockRawHashes.size());
+ }
+
+ Stopwatch GetBlockMetadataTimer;
+
+ std::vector<ChunkBlockDescription> UnorderedList;
+ tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockDescriptionLookup;
+ if (OptionalCacheStorage && !BlockRawHashes.empty())
+ {
+ std::vector<CbObject> CacheBlockMetadatas = OptionalCacheStorage->GetBlobMetadatas(BuildId, BlockRawHashes);
+ UnorderedList.reserve(CacheBlockMetadatas.size());
+ for (size_t CacheBlockMetadataIndex = 0; CacheBlockMetadataIndex < CacheBlockMetadatas.size(); CacheBlockMetadataIndex++)
+ {
+ const CbObject& CacheBlockMetadata = CacheBlockMetadatas[CacheBlockMetadataIndex];
+ ChunkBlockDescription Description = ParseChunkBlockDescription(CacheBlockMetadata);
+ if (Description.BlockHash == IoHash::Zero)
+ {
+ ZEN_OPERATION_LOG_WARN(Output, "Unexpected/invalid block metadata received from remote cache, skipping block");
+ }
+ else
+ {
+ UnorderedList.emplace_back(std::move(Description));
+ }
+ }
+ for (size_t DescriptionIndex = 0; DescriptionIndex < UnorderedList.size(); DescriptionIndex++)
+ {
+ const ChunkBlockDescription& Description = UnorderedList[DescriptionIndex];
+ BlockDescriptionLookup.insert_or_assign(Description.BlockHash, DescriptionIndex);
+ }
+ }
+
+ if (UnorderedList.size() < BlockRawHashes.size())
+ {
+ std::vector<IoHash> RemainingBlockHashes;
+ RemainingBlockHashes.reserve(BlockRawHashes.size() - UnorderedList.size());
+ for (const IoHash& BlockRawHash : BlockRawHashes)
+ {
+ if (!BlockDescriptionLookup.contains(BlockRawHash))
+ {
+ RemainingBlockHashes.push_back(BlockRawHash);
+ }
+ }
+ CbObject BlockMetadatas = Storage.GetBlockMetadatas(BuildId, RemainingBlockHashes);
+ std::vector<ChunkBlockDescription> RemainingList;
+ {
+ CbArrayView BlocksArray = BlockMetadatas["blocks"sv].AsArrayView();
+ std::vector<IoHash> FoundBlockHashes;
+ std::vector<CbObject> FoundBlockMetadatas;
+ for (CbFieldView Block : BlocksArray)
+ {
+ ChunkBlockDescription Description = ParseChunkBlockDescription(Block.AsObjectView());
+
+ if (Description.BlockHash == IoHash::Zero)
+ {
+ ZEN_OPERATION_LOG_WARN(Output, "Unexpected/invalid block metadata received from remote store, skipping block");
+ }
+ else
+ {
+ if (OptionalCacheStorage)
+ {
+ UniqueBuffer MetaBuffer = UniqueBuffer::Alloc(Block.GetSize());
+ Block.CopyTo(MetaBuffer.GetMutableView());
+ CbObject BlockMetadata(MetaBuffer.MoveToShared());
+
+ FoundBlockHashes.push_back(Description.BlockHash);
+ FoundBlockMetadatas.push_back(BlockMetadata);
+ }
+ RemainingList.emplace_back(std::move(Description));
+ }
+ }
+ if (OptionalCacheStorage && !FoundBlockHashes.empty())
+ {
+ OptionalCacheStorage->PutBlobMetadatas(BuildId, FoundBlockHashes, FoundBlockMetadatas);
+ }
+ }
+
+ for (size_t DescriptionIndex = 0; DescriptionIndex < RemainingList.size(); DescriptionIndex++)
+ {
+ const ChunkBlockDescription& Description = RemainingList[DescriptionIndex];
+ BlockDescriptionLookup.insert_or_assign(Description.BlockHash, UnorderedList.size() + DescriptionIndex);
+ }
+ UnorderedList.insert(UnorderedList.end(), RemainingList.begin(), RemainingList.end());
+ }
+
+ std::vector<ChunkBlockDescription> Result;
+ Result.reserve(BlockDescriptionLookup.size());
+ for (const IoHash& BlockHash : BlockRawHashes)
+ {
+ if (auto It = BlockDescriptionLookup.find(BlockHash); It != BlockDescriptionLookup.end())
+ {
+ Result.push_back(std::move(UnorderedList[It->second]));
+ }
+ }
+
+ if (!IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(Output,
+ "GetBlockMetadata for {} took {}. Found {} blocks",
+ BuildPartId,
+ NiceTimeSpanMs(GetBlockMetadataTimer.GetElapsedTimeMs()),
+ Result.size());
+ }
+
+ if (Result.size() != BlockRawHashes.size())
+ {
+ std::string ErrorDescription =
+ fmt::format("All required blocks could not be found, {} blocks does not have metadata in this context.",
+ BlockRawHashes.size() - Result.size());
+ if (IsVerbose)
+ {
+ for (const IoHash& BlockHash : BlockRawHashes)
+ {
+ if (auto It =
+ std::find_if(Result.begin(),
+ Result.end(),
+ [BlockHash](const ChunkBlockDescription& Description) { return Description.BlockHash == BlockHash; });
+ It == Result.end())
+ {
+ ErrorDescription += fmt::format("\n {}", BlockHash);
+ }
+ }
+ }
+ if (AttemptFallback)
+ {
+ ZEN_OPERATION_LOG_WARN(Output, "{} Attemping fallback options.", ErrorDescription);
+ std::vector<ChunkBlockDescription> AugmentedBlockDescriptions;
+ AugmentedBlockDescriptions.reserve(BlockRawHashes.size());
+ std::vector<ChunkBlockDescription> FoundBlocks = ParseChunkBlockDescriptionList(Storage.FindBlocks(BuildId, (uint64_t)-1));
+
+ for (const IoHash& BlockHash : BlockRawHashes)
+ {
+ if (auto It =
+ std::find_if(Result.begin(),
+ Result.end(),
+ [BlockHash](const ChunkBlockDescription& Description) { return Description.BlockHash == BlockHash; });
+ It != Result.end())
+ {
+ AugmentedBlockDescriptions.emplace_back(std::move(*It));
+ }
+ else if (auto ListBlocksIt = std::find_if(
+ FoundBlocks.begin(),
+ FoundBlocks.end(),
+ [BlockHash](const ChunkBlockDescription& Description) { return Description.BlockHash == BlockHash; });
+ ListBlocksIt != FoundBlocks.end())
+ {
+ ZEN_OPERATION_LOG_INFO(Output, "Found block {} via context find successfully", BlockHash);
+ AugmentedBlockDescriptions.emplace_back(std::move(*ListBlocksIt));
+ }
+ else
+ {
+ IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockHash);
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} could not be found", BlockHash));
+ }
+ IoHash BlockRawHash;
+ uint64_t BlockRawSize;
+ CompressedBuffer CompressedBlockBuffer =
+ CompressedBuffer::FromCompressed(SharedBuffer(std::move(BlockBuffer)), BlockRawHash, BlockRawSize);
+ if (!CompressedBlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} is not a compressed buffer", BlockHash));
+ }
+
+ if (BlockRawHash != BlockHash)
+ {
+ throw std::runtime_error(fmt::format("Block {} header has a mismatching raw hash {}", BlockHash, BlockRawHash));
+ }
+
+ CompositeBuffer DecompressedBlockBuffer = CompressedBlockBuffer.DecompressToComposite();
+ if (!DecompressedBlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} failed to decompress", BlockHash));
+ }
+
+ ChunkBlockDescription MissingChunkDescription = GetChunkBlockDescription(DecompressedBlockBuffer.Flatten(), BlockHash);
+ AugmentedBlockDescriptions.emplace_back(std::move(MissingChunkDescription));
+ }
+ }
+ Result.swap(AugmentedBlockDescriptions);
+ }
+ else
+ {
+ throw std::runtime_error(ErrorDescription);
+ }
+ }
+ return Result;
+}
+
} // namespace zen