""" GitHub API client for fetching repository data, issues, and pull requests. """ import requests import time from typing import List, Dict, Optional from config import GITHUB_TOKEN, GITHUB_API_BASE class GitHubClient: """Client for interacting with GitHub API.""" def __init__(self, token: Optional[str] = None): self.token = token or GITHUB_TOKEN self.headers = { 'Accept': 'application/vnd.github.v3+json', } if self.token: self.headers['Authorization'] = f'token {self.token}' self.session = requests.Session() self.session.headers.update(self.headers) def _make_request(self, url: str, params: Optional[Dict] = None) -> Dict: """Make a request to GitHub API with rate limiting.""" response = self.session.get(url, params=params) # Handle rate limiting if response.status_code == 403 and 'rate limit' in response.text.lower(): reset_time = int(response.headers.get('X-RateLimit-Reset', time.time() + 60)) wait_time = max(0, reset_time - int(time.time())) print(f"Rate limited. Waiting {wait_time} seconds...") time.sleep(wait_time) response = self.session.get(url, params=params) response.raise_for_status() return response.json() def search_repositories(self, query: str, sort: str = 'stars', order: str = 'desc', per_page: int = 10) -> List[Dict]: """Search for repositories matching the query.""" url = f"{GITHUB_API_BASE}/search/repositories" params = { 'q': query, 'sort': sort, 'order': order, 'per_page': per_page } results = self._make_request(url, params=params) return results.get('items', []) def get_repository_info(self, owner: str, repo: str) -> Dict: """Get detailed information about a repository.""" url = f"{GITHUB_API_BASE}/repos/{owner}/{repo}" return self._make_request(url) def get_repository_issues(self, owner: str, repo: str, state: str = 'all', per_page: int = 100) -> List[Dict]: """Get all issues for a repository.""" url = f"{GITHUB_API_BASE}/repos/{owner}/{repo}/issues" all_issues = [] page = 1 while True: params = { 'state': state, 'per_page': per_page, 'page': page } issues = self._make_request(url, params=params) if not issues: break # Filter out pull requests (they appear in issues endpoint) issues = [issue for issue in issues if 'pull_request' not in issue] all_issues.extend(issues) if len(issues) < per_page: break page += 1 time.sleep(0.5) # Be nice to the API return all_issues def get_repository_pulls(self, owner: str, repo: str, state: str = 'all', per_page: int = 100) -> List[Dict]: """Get all pull requests for a repository.""" url = f"{GITHUB_API_BASE}/repos/{owner}/{repo}/pulls" all_pulls = [] page = 1 while True: params = { 'state': state, 'per_page': per_page, 'page': page } pulls = self._make_request(url, params=params) if not pulls: break all_pulls.extend(pulls) if len(pulls) < per_page: break page += 1 time.sleep(0.5) return all_pulls def get_file_issues(self, owner: str, repo: str, file_path: str) -> Dict: """Get issues and PRs related to a specific file.""" # Search issues mentioning the file query = f'repo:{owner}/{repo} {file_path}' url = f"{GITHUB_API_BASE}/search/issues" params = {'q': query, 'per_page': 100} try: results = self._make_request(url, params=params) return { 'issues': len([item for item in results.get('items', []) if 'pull_request' not in item]), 'pulls': len([item for item in results.get('items', []) if 'pull_request' in item]) } except: return {'issues': 0, 'pulls': 0}