aboutsummaryrefslogtreecommitdiff
path: root/github_client.py
diff options
context:
space:
mode:
authorFuwn <[email protected]>2025-12-09 23:16:23 -0800
committerFuwn <[email protected]>2025-12-09 23:16:23 -0800
commit3ffcdb247df3f56c4c21c6fed83ee1af5fb94224 (patch)
tree409fe42bb385ca73bd1b152623465ee098434179 /github_client.py
downloadmathematicalstatisticsproject-3ffcdb247df3f56c4c21c6fed83ee1af5fb94224.tar.xz
mathematicalstatisticsproject-3ffcdb247df3f56c4c21c6fed83ee1af5fb94224.zip
feat: Initial commitHEADmain
Diffstat (limited to 'github_client.py')
-rw-r--r--github_client.py131
1 files changed, 131 insertions, 0 deletions
diff --git a/github_client.py b/github_client.py
new file mode 100644
index 0000000..153ca39
--- /dev/null
+++ b/github_client.py
@@ -0,0 +1,131 @@
+"""
+GitHub API client for fetching repository data, issues, and pull requests.
+"""
+import requests
+import time
+from typing import List, Dict, Optional
+from config import GITHUB_TOKEN, GITHUB_API_BASE
+
+
+class GitHubClient:
+ """Client for interacting with GitHub API."""
+
+ def __init__(self, token: Optional[str] = None):
+ self.token = token or GITHUB_TOKEN
+ self.headers = {
+ 'Accept': 'application/vnd.github.v3+json',
+ }
+ if self.token:
+ self.headers['Authorization'] = f'token {self.token}'
+ self.session = requests.Session()
+ self.session.headers.update(self.headers)
+
+ def _make_request(self, url: str, params: Optional[Dict] = None) -> Dict:
+ """Make a request to GitHub API with rate limiting."""
+ response = self.session.get(url, params=params)
+
+ # Handle rate limiting
+ if response.status_code == 403 and 'rate limit' in response.text.lower():
+ reset_time = int(response.headers.get('X-RateLimit-Reset', time.time() + 60))
+ wait_time = max(0, reset_time - int(time.time()))
+ print(f"Rate limited. Waiting {wait_time} seconds...")
+ time.sleep(wait_time)
+ response = self.session.get(url, params=params)
+
+ response.raise_for_status()
+ return response.json()
+
+ def search_repositories(self, query: str, sort: str = 'stars', order: str = 'desc',
+ per_page: int = 10) -> List[Dict]:
+ """Search for repositories matching the query."""
+ url = f"{GITHUB_API_BASE}/search/repositories"
+ params = {
+ 'q': query,
+ 'sort': sort,
+ 'order': order,
+ 'per_page': per_page
+ }
+ results = self._make_request(url, params=params)
+ return results.get('items', [])
+
+ def get_repository_info(self, owner: str, repo: str) -> Dict:
+ """Get detailed information about a repository."""
+ url = f"{GITHUB_API_BASE}/repos/{owner}/{repo}"
+ return self._make_request(url)
+
+ def get_repository_issues(self, owner: str, repo: str, state: str = 'all',
+ per_page: int = 100) -> List[Dict]:
+ """Get all issues for a repository."""
+ url = f"{GITHUB_API_BASE}/repos/{owner}/{repo}/issues"
+ all_issues = []
+ page = 1
+
+ while True:
+ params = {
+ 'state': state,
+ 'per_page': per_page,
+ 'page': page
+ }
+ issues = self._make_request(url, params=params)
+
+ if not issues:
+ break
+
+ # Filter out pull requests (they appear in issues endpoint)
+ issues = [issue for issue in issues if 'pull_request' not in issue]
+ all_issues.extend(issues)
+
+ if len(issues) < per_page:
+ break
+
+ page += 1
+ time.sleep(0.5) # Be nice to the API
+
+ return all_issues
+
+ def get_repository_pulls(self, owner: str, repo: str, state: str = 'all',
+ per_page: int = 100) -> List[Dict]:
+ """Get all pull requests for a repository."""
+ url = f"{GITHUB_API_BASE}/repos/{owner}/{repo}/pulls"
+ all_pulls = []
+ page = 1
+
+ while True:
+ params = {
+ 'state': state,
+ 'per_page': per_page,
+ 'page': page
+ }
+ pulls = self._make_request(url, params=params)
+
+ if not pulls:
+ break
+
+ all_pulls.extend(pulls)
+
+ if len(pulls) < per_page:
+ break
+
+ page += 1
+ time.sleep(0.5)
+
+ return all_pulls
+
+ def get_file_issues(self, owner: str, repo: str, file_path: str) -> Dict:
+ """Get issues and PRs related to a specific file."""
+ # Search issues mentioning the file
+ query = f'repo:{owner}/{repo} {file_path}'
+ url = f"{GITHUB_API_BASE}/search/issues"
+ params = {'q': query, 'per_page': 100}
+
+ try:
+ results = self._make_request(url, params=params)
+ return {
+ 'issues': len([item for item in results.get('items', [])
+ if 'pull_request' not in item]),
+ 'pulls': len([item for item in results.get('items', [])
+ if 'pull_request' in item])
+ }
+ except:
+ return {'issues': 0, 'pulls': 0}
+