""" Data collection pipeline that combines code metrics with git commit log data. """ import os import subprocess import shutil import tempfile import re from typing import List, Dict, Optional, Set from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed from tqdm import tqdm from code_analyzer import CodeAnalyzer from config import EXCLUDE_DIRS class DataCollector: """Collects and combines code metrics with git commit data.""" def __init__(self, code_analyzer: CodeAnalyzer): self.code_analyzer = code_analyzer self.temp_dir = None def clone_repository(self, owner: str, repo: str) -> Optional[str]: """Clone a repository to a temporary directory with full history.""" if not self.temp_dir: self.temp_dir = tempfile.mkdtemp() repo_url = f"https://github.com/{owner}/{repo}.git" clone_path = os.path.join(self.temp_dir, repo) try: if os.path.exists(clone_path): shutil.rmtree(clone_path) # Clone with full history (needed for commit analysis) subprocess.run( ['git', 'clone', repo_url, clone_path], check=True, capture_output=True, timeout=600 ) return clone_path except Exception as e: print(f"Error cloning {owner}/{repo}: {e}") return None def get_fix_commits(self, repo_path: str) -> Dict[str, Set[str]]: """ Analyze git commit logs to find commits with 'fix' in the message using semantic commit formats (fix:, fix(scope):, Fix:, etc.) and track which files were changed in those commits. Returns a dictionary mapping file paths to sets of commit hashes. """ file_fix_commits: Dict[str, Set[str]] = {} try: # Get all commits - we'll filter for semantic commit formats # Look for patterns like: fix:, fix(scope):, Fix:, FIX:, fixes, fixed, etc. result = subprocess.run( ['git', 'log', '--all', '--pretty=format:%H|%s'], cwd=repo_path, capture_output=True, text=True, timeout=300 ) if result.returncode != 0: print(f" Warning: git log failed: {result.stderr}") return file_fix_commits # Parse commit hashes and check for semantic commit formats fix_commit_hashes = [] for line in result.stdout.strip().split('\n'): if '|' in line: parts = line.split('|', 1) commit_hash = parts[0].strip() commit_msg = parts[1] if len(parts) > 1 else '' commit_msg_lower = commit_msg.lower() # Check for semantic commit formats: # - fix: (conventional commits) # - fix(scope): (conventional commits with scope) # - Fix:, FIX: (case variations) # - fixes #123, fix #123 (issue references) # - fixed, fixing (verb forms) # - bugfix, bug fix (variations) is_fix_commit = ( commit_msg_lower.startswith('fix:') or commit_msg_lower.startswith('fix(') or commit_msg_lower.startswith('fixes') or commit_msg_lower.startswith('fix ') or commit_msg_lower.startswith('fixed') or commit_msg_lower.startswith('fixing') or 'bugfix' in commit_msg_lower or 'bug fix' in commit_msg_lower or (commit_msg.startswith('Fix:') and not commit_msg_lower.startswith('feature')) or (commit_msg.startswith('FIX:') and not commit_msg_lower.startswith('feature')) ) # Exclude false positives (like "prefix", "suffix", "affix", etc.) if is_fix_commit and not any(word in commit_msg_lower for word in ['prefix', 'suffix', 'affix', 'transfix', 'crucifix']): if len(commit_hash) == 40: fix_commit_hashes.append(commit_hash) print(f" Found {len(fix_commit_hashes)} fix commits") # For each fix commit, get the files that were changed (parallelized) def get_commit_files(commit_hash: str) -> List[str]: """Get Python files changed in a commit.""" try: file_result = subprocess.run( ['git', 'show', '--name-only', '--pretty=format:', commit_hash], cwd=repo_path, capture_output=True, text=True, timeout=60 ) if file_result.returncode == 0: return [f.strip() for f in file_result.stdout.strip().split('\n') if f.strip() and f.strip().endswith('.py')] except: pass return [] # Process commits in parallel if fix_commit_hashes: with ThreadPoolExecutor(max_workers=10) as executor: futures = {executor.submit(get_commit_files, commit_hash): commit_hash for commit_hash in fix_commit_hashes} for future in as_completed(futures): commit_hash = futures[future] try: changed_files = future.result() for file_path in changed_files: if file_path not in file_fix_commits: file_fix_commits[file_path] = set() file_fix_commits[file_path].add(commit_hash) except Exception as e: print(f" Warning: Error processing commit {commit_hash[:8]}: {e}") except Exception as e: print(f" Warning: Error analyzing commits: {e}") return file_fix_commits def count_fixes_per_file(self, repo_path: str, code_metrics: List[Dict]) -> None: """ Count fix commits for each file in code_metrics. Updates the metrics dictionaries in place. """ print(" Analyzing git commit logs for semantic fix commits...") file_fix_commits = self.get_fix_commits(repo_path) if not file_fix_commits: print(" No fix commits found matching semantic commit formats") # Set fix_count to 0 for all files for metric in code_metrics: metric['fix_count'] = 0 metric['total_fixes'] = 0 return repo_base = Path(repo_path) # Create a mapping from relative paths to fix counts fix_counts: Dict[str, int] = {} for file_path, commits in file_fix_commits.items(): # Try to normalize the path try: abs_path = Path(repo_path) / file_path if abs_path.exists(): rel_path = abs_path.relative_to(repo_base) fix_counts[str(rel_path)] = len(commits) # Also store with forward slashes for matching fix_counts[file_path] = len(commits) except: fix_counts[file_path] = len(commits) print(f" Found {len(fix_counts)} files with fix commits") # Match files to fix counts for metric in tqdm(code_metrics, desc=" Matching files with fixes"): relative_path = metric.get('relative_path', '') filename = metric.get('filename', '') file_path = metric.get('file_path', '') fix_count = 0 # Try multiple matching strategies if relative_path in fix_counts: fix_count = fix_counts[relative_path] elif filename in fix_counts: fix_count = fix_counts[filename] else: # Try matching by filename in the fix_counts keys for fix_file, count in fix_counts.items(): if filename in fix_file or relative_path in fix_file: fix_count = max(fix_count, count) metric['fix_count'] = fix_count metric['total_fixes'] = fix_count # Alias for consistency def collect_repository_data(self, owner: str, repo: str, parallel_files: bool = True, max_workers: Optional[int] = None) -> Optional[Dict]: """Collect all data for a repository.""" print(f"\nCollecting data for {owner}/{repo}...") # Clone repository repo_path = self.clone_repository(owner, repo) if not repo_path: return None # Analyze code metrics (parallelized) print(" Analyzing code metrics...") code_metrics = self.code_analyzer.analyze_directory(repo_path, parallel=parallel_files, max_workers=max_workers) if not code_metrics: print(f" No Python files found in {owner}/{repo}") return None # Map file paths to relative paths for matching repo_base = Path(repo_path) for metric in code_metrics: file_path = Path(metric['file_path']) try: relative_path = file_path.relative_to(repo_base) metric['relative_path'] = str(relative_path).replace('\\', '/') except: metric['relative_path'] = metric['file_path'] # Count fix commits per file self.count_fixes_per_file(repo_path, code_metrics) # Get total fix commits count total_fixes = sum(metric.get('fix_count', 0) for metric in code_metrics) return { 'owner': owner, 'repo': repo, 'code_metrics': code_metrics, 'total_fixes': total_fixes } def cleanup(self): """Clean up temporary directories.""" if self.temp_dir and os.path.exists(self.temp_dir): shutil.rmtree(self.temp_dir) self.temp_dir = None