// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include #include namespace zen { namespace { std::string ConnectionSettingsToString(const HttpClientSettings& ClientSettings) { ExtendableStringBuilder<128> SB; SB << "\n LogCategory: " << ClientSettings.LogCategory; SB << "\n ConnectTimeout: " << ClientSettings.ConnectTimeout.count() << " ms"; SB << "\n Timeout: " << ClientSettings.Timeout.count() << " ms"; SB << "\n AccessTokenProvider: " << ClientSettings.AccessTokenProvider.has_value(); SB << "\n AssumeHttp2: " << ClientSettings.AssumeHttp2; SB << "\n AllowResume: " << ClientSettings.AllowResume; SB << "\n RetryCount: " << ClientSettings.RetryCount; SB << "\n SessionId: " << ClientSettings.SessionId.ToString(); SB << "\n Verbose: " << ClientSettings.Verbose; SB << "\n MaximumInMemoryDownloadSize: " << ClientSettings.MaximumInMemoryDownloadSize; return SB.ToString(); } } // namespace BuildStorageResolveResult ResolveBuildStorage(OperationLogOutput& Output, const HttpClientSettings& ClientSettings, std::string_view Host, std::string_view OverrideHost, std::string_view ZenCacheHost, ZenCacheResolveMode ZenResolveMode, bool Verbose) { bool AllowZenCacheDiscovery = ZenResolveMode == ZenCacheResolveMode::Discovery || ZenResolveMode == ZenCacheResolveMode::All; bool AllowLocalZenCache = ZenResolveMode == ZenCacheResolveMode::LocalHost || ZenResolveMode == ZenCacheResolveMode::All; auto GetHostNameFromUrl = [](std::string_view Url) -> std::string_view { std::string::size_type HostnameStart = 0; std::string::size_type HostnameLength = std::string::npos; if (auto StartPos = Url.find("//"); StartPos != std::string::npos) { HostnameStart = StartPos + 2; } if (auto EndPos = Url.find("/", HostnameStart); EndPos != std::string::npos) { HostnameLength = EndPos - HostnameStart; } if (auto EndPos = Url.find(":", HostnameStart); EndPos != std::string::npos) { HostnameLength = EndPos - HostnameStart; } return Url.substr(HostnameStart, HostnameLength); }; std::string HostUrl; std::string HostName; double HostLatencySec = -1.0; std::string CacheUrl; std::string CacheName; bool HostAssumeHttp2 = ClientSettings.AssumeHttp2; bool CacheAssumeHttp2 = ClientSettings.AssumeHttp2; double CacheLatencySec = -1.0; JupiterServerDiscovery DiscoveryResponse; const std::string_view DiscoveryHost = Host.empty() ? OverrideHost : Host; if (OverrideHost.empty() || (ZenCacheHost.empty() && AllowZenCacheDiscovery)) { if (Verbose) { ZEN_OPERATION_LOG_INFO(Output, "Querying servers at '{}/api/v1/status/servers'\n Connection settings:{}", DiscoveryHost, ConnectionSettingsToString(ClientSettings)); } DiscoveryResponse = DiscoverJupiterEndpoints(DiscoveryHost, ClientSettings); } if (!OverrideHost.empty()) { if (Verbose) { ZEN_OPERATION_LOG_INFO(Output, "Testing server endpoint at '{}/health/live'. Assume http2: {}", OverrideHost, HostAssumeHttp2); } if (JupiterEndpointTestResult TestResult = TestJupiterEndpoint(OverrideHost, HostAssumeHttp2, ClientSettings.Verbose); TestResult.Success) { if (Verbose) { ZEN_OPERATION_LOG_INFO(Output, "Server endpoint at '{}/api/v1/status/servers' succeeded", OverrideHost); } HostUrl = OverrideHost; HostName = GetHostNameFromUrl(OverrideHost); HostLatencySec = TestResult.LatencySeconds; } else { throw std::runtime_error(fmt::format("Host {} could not be reached. Reason: {}", OverrideHost, TestResult.FailureReason)); } } else { if (DiscoveryResponse.ServerEndPoints.empty()) { throw std::runtime_error(fmt::format("Failed to find any builds hosts at {}", DiscoveryHost)); } for (const JupiterServerDiscovery::EndPoint& ServerEndpoint : DiscoveryResponse.ServerEndPoints) { if (!ServerEndpoint.BaseUrl.empty()) { if (Verbose) { ZEN_OPERATION_LOG_INFO(Output, "Testing server endpoint at '{}/health/live'. Assume http2: {}", ServerEndpoint.BaseUrl, ServerEndpoint.AssumeHttp2); } if (JupiterEndpointTestResult TestResult = TestJupiterEndpoint(ServerEndpoint.BaseUrl, ServerEndpoint.AssumeHttp2, ClientSettings.Verbose); TestResult.Success) { if (Verbose) { ZEN_OPERATION_LOG_INFO(Output, "Server endpoint at '{}/api/v1/status/servers' succeeded", ServerEndpoint.BaseUrl); } HostUrl = ServerEndpoint.BaseUrl; HostAssumeHttp2 = ServerEndpoint.AssumeHttp2; HostName = ServerEndpoint.Name; HostLatencySec = TestResult.LatencySeconds; break; } else { ZEN_OPERATION_LOG_DEBUG(Output, "Unable to reach host {}. Reason: {}", ServerEndpoint.BaseUrl, TestResult.FailureReason); } } } if (HostUrl.empty()) { throw std::runtime_error(fmt::format("Failed to find any usable builds hosts out of {} using {}", DiscoveryResponse.ServerEndPoints.size(), DiscoveryHost)); } } if (ZenCacheHost.empty()) { if (AllowZenCacheDiscovery) { for (const JupiterServerDiscovery::EndPoint& CacheEndpoint : DiscoveryResponse.CacheEndPoints) { if (!CacheEndpoint.BaseUrl.empty()) { if (Verbose) { ZEN_OPERATION_LOG_INFO(Output, "Testing cache endpoint at '{}/status/builds'. Assume http2: {}", CacheEndpoint.BaseUrl, CacheEndpoint.AssumeHttp2); } if (ZenCacheEndpointTestResult TestResult = TestZenCacheEndpoint(CacheEndpoint.BaseUrl, CacheEndpoint.AssumeHttp2, ClientSettings.Verbose); TestResult.Success) { if (Verbose) { ZEN_OPERATION_LOG_INFO(Output, "Cache endpoint at '{}/status/builds' succeeded", CacheEndpoint.BaseUrl); } CacheUrl = CacheEndpoint.BaseUrl; CacheAssumeHttp2 = CacheEndpoint.AssumeHttp2; CacheName = CacheEndpoint.Name; CacheLatencySec = TestResult.LatencySeconds; break; } } } } if (CacheUrl.empty() && AllowLocalZenCache) { ZenServerState State; if (State.InitializeReadOnly()) { State.Snapshot([&](const ZenServerState::ZenServerEntry& Entry) { if (CacheUrl.empty()) { std::string ZenServerLocalHostUrl = fmt::format("http://127.0.0.1:{}", Entry.EffectiveListenPort.load()); if (ZenCacheEndpointTestResult TestResult = TestZenCacheEndpoint(ZenServerLocalHostUrl, /*AssumeHttp2*/ false, ClientSettings.Verbose); TestResult.Success) { CacheUrl = ZenServerLocalHostUrl; CacheAssumeHttp2 = false; CacheName = "localhost"; CacheLatencySec = TestResult.LatencySeconds; } } }); } } } else { if (Verbose) { ZEN_OPERATION_LOG_INFO(Output, "Testing cache endpoint at '{}/status/builds'. Assume http2: {}", ZenCacheHost, false); } if (ZenCacheEndpointTestResult TestResult = TestZenCacheEndpoint(ZenCacheHost, /*AssumeHttp2*/ false, ClientSettings.Verbose); TestResult.Success) { CacheUrl = ZenCacheHost; CacheName = GetHostNameFromUrl(ZenCacheHost); CacheLatencySec = TestResult.LatencySeconds; } else { ZEN_WARN("Unable to reach cache host {}. Reason: {}", ZenCacheHost, TestResult.FailureReason); } } return BuildStorageResolveResult{.HostUrl = HostUrl, .HostName = HostName, .HostAssumeHttp2 = HostAssumeHttp2, .HostLatencySec = HostLatencySec, .CacheUrl = CacheUrl, .CacheName = CacheName, .CacheAssumeHttp2 = CacheAssumeHttp2, .CacheLatencySec = CacheLatencySec}; } std::vector GetBlockDescriptions(OperationLogOutput& Output, BuildStorageBase& Storage, BuildStorageCache* OptionalCacheStorage, const Oid& BuildId, const Oid& BuildPartId, std::span BlockRawHashes, bool AttemptFallback, bool IsQuiet, bool IsVerbose) { using namespace std::literals; if (!IsQuiet) { ZEN_OPERATION_LOG_INFO(Output, "Fetching metadata for {} blocks", BlockRawHashes.size()); } Stopwatch GetBlockMetadataTimer; std::vector UnorderedList; tsl::robin_map BlockDescriptionLookup; if (OptionalCacheStorage && !BlockRawHashes.empty()) { std::vector CacheBlockMetadatas = OptionalCacheStorage->GetBlobMetadatas(BuildId, BlockRawHashes); UnorderedList.reserve(CacheBlockMetadatas.size()); for (size_t CacheBlockMetadataIndex = 0; CacheBlockMetadataIndex < CacheBlockMetadatas.size(); CacheBlockMetadataIndex++) { const CbObject& CacheBlockMetadata = CacheBlockMetadatas[CacheBlockMetadataIndex]; ChunkBlockDescription Description = ParseChunkBlockDescription(CacheBlockMetadata); if (Description.BlockHash == IoHash::Zero) { ZEN_OPERATION_LOG_WARN(Output, "Unexpected/invalid block metadata received from remote cache, skipping block"); } else { UnorderedList.emplace_back(std::move(Description)); } } for (size_t DescriptionIndex = 0; DescriptionIndex < UnorderedList.size(); DescriptionIndex++) { const ChunkBlockDescription& Description = UnorderedList[DescriptionIndex]; BlockDescriptionLookup.insert_or_assign(Description.BlockHash, DescriptionIndex); } } if (UnorderedList.size() < BlockRawHashes.size()) { std::vector RemainingBlockHashes; RemainingBlockHashes.reserve(BlockRawHashes.size() - UnorderedList.size()); for (const IoHash& BlockRawHash : BlockRawHashes) { if (!BlockDescriptionLookup.contains(BlockRawHash)) { RemainingBlockHashes.push_back(BlockRawHash); } } CbObject BlockMetadatas = Storage.GetBlockMetadatas(BuildId, RemainingBlockHashes); std::vector RemainingList; { CbArrayView BlocksArray = BlockMetadatas["blocks"sv].AsArrayView(); std::vector FoundBlockHashes; std::vector FoundBlockMetadatas; for (CbFieldView Block : BlocksArray) { ChunkBlockDescription Description = ParseChunkBlockDescription(Block.AsObjectView()); if (Description.BlockHash == IoHash::Zero) { ZEN_OPERATION_LOG_WARN(Output, "Unexpected/invalid block metadata received from remote store, skipping block"); } else { if (OptionalCacheStorage) { UniqueBuffer MetaBuffer = UniqueBuffer::Alloc(Block.GetSize()); Block.CopyTo(MetaBuffer.GetMutableView()); CbObject BlockMetadata(MetaBuffer.MoveToShared()); FoundBlockHashes.push_back(Description.BlockHash); FoundBlockMetadatas.push_back(BlockMetadata); } RemainingList.emplace_back(std::move(Description)); } } if (OptionalCacheStorage && !FoundBlockHashes.empty()) { OptionalCacheStorage->PutBlobMetadatas(BuildId, FoundBlockHashes, FoundBlockMetadatas); } } for (size_t DescriptionIndex = 0; DescriptionIndex < RemainingList.size(); DescriptionIndex++) { const ChunkBlockDescription& Description = RemainingList[DescriptionIndex]; BlockDescriptionLookup.insert_or_assign(Description.BlockHash, UnorderedList.size() + DescriptionIndex); } UnorderedList.insert(UnorderedList.end(), RemainingList.begin(), RemainingList.end()); } std::vector Result; Result.reserve(BlockDescriptionLookup.size()); for (const IoHash& BlockHash : BlockRawHashes) { if (auto It = BlockDescriptionLookup.find(BlockHash); It != BlockDescriptionLookup.end()) { Result.push_back(std::move(UnorderedList[It->second])); } } if (!IsQuiet) { ZEN_OPERATION_LOG_INFO(Output, "GetBlockMetadata for {} took {}. Found {} blocks", BuildPartId, NiceTimeSpanMs(GetBlockMetadataTimer.GetElapsedTimeMs()), Result.size()); } if (Result.size() != BlockRawHashes.size()) { std::string ErrorDescription = fmt::format("All required blocks could not be found, {} blocks does not have metadata in this context.", BlockRawHashes.size() - Result.size()); if (IsVerbose) { for (const IoHash& BlockHash : BlockRawHashes) { if (auto It = std::find_if(Result.begin(), Result.end(), [BlockHash](const ChunkBlockDescription& Description) { return Description.BlockHash == BlockHash; }); It == Result.end()) { ErrorDescription += fmt::format("\n {}", BlockHash); } } } if (AttemptFallback) { ZEN_OPERATION_LOG_WARN(Output, "{} Attemping fallback options.", ErrorDescription); std::vector AugmentedBlockDescriptions; AugmentedBlockDescriptions.reserve(BlockRawHashes.size()); std::vector FoundBlocks = ParseChunkBlockDescriptionList(Storage.FindBlocks(BuildId, (uint64_t)-1)); for (const IoHash& BlockHash : BlockRawHashes) { if (auto It = std::find_if(Result.begin(), Result.end(), [BlockHash](const ChunkBlockDescription& Description) { return Description.BlockHash == BlockHash; }); It != Result.end()) { AugmentedBlockDescriptions.emplace_back(std::move(*It)); } else if (auto ListBlocksIt = std::find_if( FoundBlocks.begin(), FoundBlocks.end(), [BlockHash](const ChunkBlockDescription& Description) { return Description.BlockHash == BlockHash; }); ListBlocksIt != FoundBlocks.end()) { if (!IsQuiet) { ZEN_OPERATION_LOG_INFO(Output, "Found block {} via context find successfully", BlockHash); } AugmentedBlockDescriptions.emplace_back(std::move(*ListBlocksIt)); } else { IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockHash); if (!BlockBuffer) { throw std::runtime_error(fmt::format("Block {} could not be found", BlockHash)); } IoHash BlockRawHash; uint64_t BlockRawSize; CompressedBuffer CompressedBlockBuffer = CompressedBuffer::FromCompressed(SharedBuffer(std::move(BlockBuffer)), BlockRawHash, BlockRawSize); if (!CompressedBlockBuffer) { throw std::runtime_error(fmt::format("Block {} is not a compressed buffer", BlockHash)); } if (BlockRawHash != BlockHash) { throw std::runtime_error(fmt::format("Block {} header has a mismatching raw hash {}", BlockHash, BlockRawHash)); } CompositeBuffer DecompressedBlockBuffer = CompressedBlockBuffer.DecompressToComposite(); if (!DecompressedBlockBuffer) { throw std::runtime_error(fmt::format("Block {} failed to decompress", BlockHash)); } ChunkBlockDescription MissingChunkDescription = GetChunkBlockDescription(DecompressedBlockBuffer.Flatten(), BlockHash); AugmentedBlockDescriptions.emplace_back(std::move(MissingChunkDescription)); } } Result.swap(AugmentedBlockDescriptions); } else { throw std::runtime_error(ErrorDescription); } } return Result; } } // namespace zen