diff options
| -rw-r--r-- | .gitignore | 33 | ||||
| -rw-r--r-- | README.md | 94 | ||||
| -rw-r--r-- | code_analyzer.py | 213 | ||||
| -rw-r--r-- | config.py | 64 | ||||
| -rw-r--r-- | data_collector.py | 251 | ||||
| -rw-r--r-- | github_client.py | 131 | ||||
| -rw-r--r-- | main.py | 438 | ||||
| -rw-r--r-- | requirements.txt | 13 | ||||
| -rw-r--r-- | statistical_analysis.py | 553 | ||||
| -rw-r--r-- | visualizer.py | 255 |
10 files changed, 2045 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..27755d4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,33 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +env/ +venv/ +ENV/ +.venv + +# Project specific +results/ +figures/ +*.csv +*.json +.env + +# IDE +.vscode/ +.idea/ +*.swp +*.swo + +# OS +.DS_Store +Thumbs.db + +# Temporary +tmp/ +temp/ +*.tmp + diff --git a/README.md b/README.md new file mode 100644 index 0000000..83b74d4 --- /dev/null +++ b/README.md @@ -0,0 +1,94 @@ +# Code Metrics Analysis Project + +This project analyzes code metrics from open-source Python projects on GitHub to investigate relationships between code complexity and issues/fixes. + +## Features + +- **Code Metrics Analysis**: Measures LOC, cyclomatic complexity, cognitive complexity, inheritance depth, and maintainability index +- **Git Commit Analysis**: Analyzes git commit logs to find commits with "fix" in the message and tracks which files were changed +- **Statistical Analysis**: + - Correlation analysis (Pearson and Spearman) + - Linear regression modeling + - Hypothesis testing (ANOVA, Kruskal-Wallis) + - Confidence intervals + - Variance-covariance analysis + - Pivot tables + - Discrete distribution analysis +- **Visualizations**: Creates comprehensive plots and charts + +## Setup + +1. **Install dependencies**: + +```bash +pip install -r requirements.txt +``` + +2. **Set up GitHub API token** (optional but recommended): + - Create a `.env` file in the project root + - Add your GitHub token: `GITHUB_TOKEN=your_token_here` + - Get a token from: https://github.com/settings/tokens + +## Usage + +Run the main analysis script: + +```bash +python main.py +``` + +The script will: + +1. Use a curated list of popular Python projects that use semantic commits +2. Clone repositories with full git history +3. Analyze code metrics for all Python files +4. Parse git commit logs to find "fix" commits (using semantic commit formats like "fix:", "fix(scope):", etc.) and track changed files +5. Perform statistical analysis +6. Generate visualizations +7. Save results to `results/` and `figures/` directories + +## Configuration + +Edit `config.py` to customize: + +- Number of repositories to analyze (`MAX_REPOSITORIES`) +- Minimum stars for repository selection (`MIN_STARS`) +- Excluded directories (`EXCLUDE_DIRS`) +- Statistical significance level (`SIGNIFICANCE_LEVEL`) +- Confidence level (`CONFIDENCE_LEVEL`) + +## Output + +- `results/raw_metrics.csv`: All collected code metrics +- `results/analysis_results.json`: Statistical analysis results +- `figures/`: Various visualization plots + +## Project Structure + +- `main.py`: Main orchestration script +- `github_client.py`: GitHub API client +- `code_analyzer.py`: Code metrics analyzer +- `data_collector.py`: Data collection pipeline +- `statistical_analysis.py`: Statistical analysis functions +- `visualizer.py`: Visualization functions +- `config.py`: Configuration settings + +## Requirements + +- Python 3.8+ +- Git (for cloning repositories) +- GitHub API token (optional, increases rate limits) + +## Notes + +- The analysis focuses on popular Python projects that use semantic commits +- Fix detection recognizes semantic commit formats: + - `fix:` (conventional commits) + - `fix(scope):` (conventional commits with scope) + - `Fix:`, `FIX:` (case variations) + - `fixes #123`, `fix #123` (issue references) + - `fixed`, `fixing`, `bugfix`, `bug fix` (variations) +- Only Python files (.py) are tracked for fix commits +- Full git history is cloned (not shallow) to analyze all commits +- Temporary cloned repositories are cleaned up after analysis +- The curated repository list can be modified in `config.py` diff --git a/code_analyzer.py b/code_analyzer.py new file mode 100644 index 0000000..5b0b313 --- /dev/null +++ b/code_analyzer.py @@ -0,0 +1,213 @@ +""" +Code metrics analyzer for Python files. +Analyzes LOC, complexity, and other metrics. +""" +import os +import ast +from typing import Dict, List, Optional +from pathlib import Path +from concurrent.futures import ProcessPoolExecutor, as_completed +from functools import partial +import radon.complexity as radon_complexity +from radon.metrics import mi_visit +from radon.raw import analyze + + +class CodeAnalyzer: + """Analyzer for Python code metrics.""" + + def __init__(self, exclude_dirs: Optional[List[str]] = None): + self.exclude_dirs = exclude_dirs or [] + + def should_analyze(self, file_path: str) -> bool: + """Check if a file should be analyzed.""" + path = Path(file_path) + + # Check if in excluded directory + for part in path.parts: + if part in self.exclude_dirs: + return False + + # Check if Python file + return path.suffix == '.py' + + def analyze_file(self, file_path: str) -> Optional[Dict]: + """Analyze a single Python file and return metrics.""" + if not self.should_analyze(file_path): + return None + + try: + with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: + code = f.read() + + if not code.strip(): + return None + + metrics = {} + + # Lines of Code (LOC) + raw_metrics = analyze(code) + metrics['loc'] = raw_metrics.loc + metrics['lloc'] = raw_metrics.lloc # Logical lines of code + metrics['sloc'] = raw_metrics.sloc # Source lines of code + metrics['comments'] = raw_metrics.comments + metrics['blank_lines'] = raw_metrics.blank + + # Cyclomatic Complexity using Radon + try: + complexity_results = radon_complexity.cc_visit(code) + total_complexity = sum(func.complexity for func in complexity_results) + avg_complexity = (total_complexity / len(complexity_results) + if complexity_results else 0) + max_complexity = max((func.complexity for func in complexity_results), + default=0) + + metrics['cyclomatic_complexity'] = total_complexity + metrics['avg_complexity'] = avg_complexity + metrics['max_complexity'] = max_complexity + metrics['functions'] = len(complexity_results) + except: + metrics['cyclomatic_complexity'] = 0 + metrics['avg_complexity'] = 0 + metrics['max_complexity'] = 0 + metrics['functions'] = 0 + + # Cognitive Complexity calculation + # Note: Lizard doesn't provide cognitive complexity directly + # We calculate it based on cyclomatic complexity + nesting depth penalty + # Cognitive complexity penalizes nesting more heavily than cyclomatic complexity + try: + import lizard + lizard_result = lizard.analyze_file(file_path) + + if lizard_result and lizard_result.function_list: + total_cognitive = 0 + for func in lizard_result.function_list: + # Get cyclomatic complexity (base) + base_cc = getattr(func, 'cyclomatic_complexity', 1) + # Get nesting depth (cognitive complexity penalizes nesting) + nesting_depth = getattr(func, 'max_nesting_depth', 0) + # Cognitive complexity formula: CC + (nesting_depth * 2) + # This approximates cognitive complexity by penalizing deep nesting + cognitive = base_cc + (nesting_depth * 2) + total_cognitive += max(0, cognitive) # Ensure non-negative + + metrics['cognitive_complexity'] = total_cognitive + metrics['avg_cognitive_complexity'] = ( + total_cognitive / len(lizard_result.function_list) + if lizard_result.function_list else 0 + ) + else: + # No functions found, set to 0 + metrics['cognitive_complexity'] = 0 + metrics['avg_cognitive_complexity'] = 0 + except Exception as e: + # If lizard fails, use cyclomatic complexity as fallback + # This ensures we have some complexity metric even if lizard fails + metrics['cognitive_complexity'] = metrics.get('cyclomatic_complexity', 0) + metrics['avg_cognitive_complexity'] = metrics.get('avg_complexity', 0) + + # Maintainability Index + try: + metrics['maintainability_index'] = mi_visit(code, multi=True) + except: + metrics['maintainability_index'] = 0 + + # Depth of Inheritance (for classes) + try: + tree = ast.parse(code) + max_depth = self._calculate_inheritance_depth(tree) + metrics['max_inheritance_depth'] = max_depth + metrics['classes'] = len([node for node in ast.walk(tree) + if isinstance(node, ast.ClassDef)]) + except: + metrics['max_inheritance_depth'] = 0 + metrics['classes'] = 0 + + # File path components for module analysis + path_parts = Path(file_path).parts + metrics['file_path'] = file_path + metrics['module'] = path_parts[-2] if len(path_parts) > 1 else 'root' + metrics['filename'] = path_parts[-1] + + return metrics + + except Exception as e: + print(f"Error analyzing {file_path}: {e}") + return None + + def _calculate_inheritance_depth(self, tree: ast.AST) -> int: + """Calculate maximum inheritance depth in the AST.""" + max_depth = 0 + + for node in ast.walk(tree): + if isinstance(node, ast.ClassDef): + depth = self._get_class_depth(node, tree) + max_depth = max(max_depth, depth) + + return max_depth + + def _get_class_depth(self, class_node: ast.ClassDef, tree: ast.AST) -> int: + """Get inheritance depth for a specific class.""" + if not class_node.bases: + return 1 + + max_base_depth = 0 + for base in class_node.bases: + if isinstance(base, ast.Name): + # Find the base class definition + for node in ast.walk(tree): + if isinstance(node, ast.ClassDef) and node.name == base.id: + base_depth = self._get_class_depth(node, tree) + max_base_depth = max(max_base_depth, base_depth) + break + + return max_base_depth + 1 + + def analyze_directory(self, directory: str, parallel: bool = True, max_workers: Optional[int] = None) -> List[Dict]: + """Analyze all Python files in a directory recursively.""" + directory_path = Path(directory) + + # Collect all Python files to analyze + files_to_analyze = [ + str(file_path) for file_path in directory_path.rglob('*.py') + if self.should_analyze(str(file_path)) + ] + + if not files_to_analyze: + return [] + + if parallel and len(files_to_analyze) > 1: + # Parallel analysis + metrics_list = [] + with ProcessPoolExecutor(max_workers=max_workers) as executor: + # Create a partial function with the analyzer instance's method + analyze_func = partial(_analyze_file_wrapper, exclude_dirs=self.exclude_dirs) + futures = {executor.submit(analyze_func, file_path): file_path + for file_path in files_to_analyze} + + for future in as_completed(futures): + try: + metrics = future.result() + if metrics: + metrics_list.append(metrics) + except Exception as e: + file_path = futures[future] + print(f" Warning: Error analyzing {file_path}: {e}") + + return metrics_list + else: + # Sequential analysis + metrics_list = [] + for file_path in files_to_analyze: + metrics = self.analyze_file(file_path) + if metrics: + metrics_list.append(metrics) + return metrics_list + + +def _analyze_file_wrapper(file_path: str, exclude_dirs: List[str]) -> Optional[Dict]: + """Wrapper function for parallel file analysis.""" + analyzer = CodeAnalyzer(exclude_dirs=exclude_dirs) + return analyzer.analyze_file(file_path) + diff --git a/config.py b/config.py new file mode 100644 index 0000000..40fbedf --- /dev/null +++ b/config.py @@ -0,0 +1,64 @@ +""" +Configuration file for the code metrics analysis project. +""" +import os +from dotenv import load_dotenv + +load_dotenv() + +# GitHub API Configuration +GITHUB_TOKEN = os.getenv('GITHUB_TOKEN', '') # Set your token in .env file +GITHUB_API_BASE = 'https://api.github.com' + +# Analysis Configuration +MAX_REPOSITORIES = 10 # Limit number of repos to analyze +MIN_STARS = 100 # Minimum stars for repository selection +PYTHON_FILE_EXTENSIONS = ['.py'] +EXCLUDE_DIRS = ['__pycache__', '.git', 'venv', 'env', '.venv', 'node_modules', 'tests', 'test'] + +# Curated list of popular Python projects that use semantic commits +# Format: (owner, repo_name) +CURATED_REPOSITORIES = [ + # FastAPI - modern web framework + ('tiangolo', 'fastapi'), + # Requests - HTTP library + ('psf', 'requests'), + # Django REST Framework - API framework + ('encode', 'djangorestframework'), + # Flask - web framework + ('pallets', 'flask'), + # Celery - distributed task queue + ('celery', 'celery'), + # Pydantic - data validation + ('pydantic', 'pydantic'), + # SQLAlchemy - SQL toolkit + ('sqlalchemy', 'sqlalchemy'), + # Pandas - data analysis + ('pandas-dev', 'pandas'), + # NumPy - numerical computing + ('numpy', 'numpy'), + # Scikit-learn - machine learning + ('scikit-learn', 'scikit-learn'), +] + +# Statistical Analysis Configuration +SIGNIFICANCE_LEVEL = 0.05 +CONFIDENCE_LEVEL = 0.95 + +# Output Configuration +OUTPUT_DIR = 'results' +FIGURES_DIR = 'figures' + +# Data Loading Configuration +USE_EXISTING_METRICS = False # If True, load from existing raw_metrics.csv instead of collecting new data +# Set to False to recollect data with fixed cognitive_complexity calculation +RAW_METRICS_FILE = 'results/raw_metrics.csv' # Path to existing raw metrics CSV file + +# Analysis Mode Configuration +FOCUSED_MODE = True # If True, only perform regression analysis and hypothesis testing (t-tests, z-tests) + +# Parallelization Configuration +MAX_WORKERS = None # None = use CPU count, or set to specific number +PARALLEL_REPOS = True # Process repositories in parallel +PARALLEL_FILES = True # Analyze files in parallel + diff --git a/data_collector.py b/data_collector.py new file mode 100644 index 0000000..90b9416 --- /dev/null +++ b/data_collector.py @@ -0,0 +1,251 @@ +""" +Data collection pipeline that combines code metrics with git commit log data. +""" +import os +import subprocess +import shutil +import tempfile +import re +from typing import List, Dict, Optional, Set +from pathlib import Path +from concurrent.futures import ThreadPoolExecutor, as_completed +from tqdm import tqdm +from code_analyzer import CodeAnalyzer +from config import EXCLUDE_DIRS + + +class DataCollector: + """Collects and combines code metrics with git commit data.""" + + def __init__(self, code_analyzer: CodeAnalyzer): + self.code_analyzer = code_analyzer + self.temp_dir = None + + def clone_repository(self, owner: str, repo: str) -> Optional[str]: + """Clone a repository to a temporary directory with full history.""" + if not self.temp_dir: + self.temp_dir = tempfile.mkdtemp() + + repo_url = f"https://github.com/{owner}/{repo}.git" + clone_path = os.path.join(self.temp_dir, repo) + + try: + if os.path.exists(clone_path): + shutil.rmtree(clone_path) + + # Clone with full history (needed for commit analysis) + subprocess.run( + ['git', 'clone', repo_url, clone_path], + check=True, + capture_output=True, + timeout=600 + ) + return clone_path + except Exception as e: + print(f"Error cloning {owner}/{repo}: {e}") + return None + + def get_fix_commits(self, repo_path: str) -> Dict[str, Set[str]]: + """ + Analyze git commit logs to find commits with 'fix' in the message + using semantic commit formats (fix:, fix(scope):, Fix:, etc.) + and track which files were changed in those commits. + + Returns a dictionary mapping file paths to sets of commit hashes. + """ + file_fix_commits: Dict[str, Set[str]] = {} + + try: + # Get all commits - we'll filter for semantic commit formats + # Look for patterns like: fix:, fix(scope):, Fix:, FIX:, fixes, fixed, etc. + result = subprocess.run( + ['git', 'log', '--all', '--pretty=format:%H|%s'], + cwd=repo_path, + capture_output=True, + text=True, + timeout=300 + ) + + if result.returncode != 0: + print(f" Warning: git log failed: {result.stderr}") + return file_fix_commits + + # Parse commit hashes and check for semantic commit formats + fix_commit_hashes = [] + for line in result.stdout.strip().split('\n'): + if '|' in line: + parts = line.split('|', 1) + commit_hash = parts[0].strip() + commit_msg = parts[1] if len(parts) > 1 else '' + commit_msg_lower = commit_msg.lower() + + # Check for semantic commit formats: + # - fix: (conventional commits) + # - fix(scope): (conventional commits with scope) + # - Fix:, FIX: (case variations) + # - fixes #123, fix #123 (issue references) + # - fixed, fixing (verb forms) + # - bugfix, bug fix (variations) + is_fix_commit = ( + commit_msg_lower.startswith('fix:') or + commit_msg_lower.startswith('fix(') or + commit_msg_lower.startswith('fixes') or + commit_msg_lower.startswith('fix ') or + commit_msg_lower.startswith('fixed') or + commit_msg_lower.startswith('fixing') or + 'bugfix' in commit_msg_lower or + 'bug fix' in commit_msg_lower or + (commit_msg.startswith('Fix:') and not commit_msg_lower.startswith('feature')) or + (commit_msg.startswith('FIX:') and not commit_msg_lower.startswith('feature')) + ) + + # Exclude false positives (like "prefix", "suffix", "affix", etc.) + if is_fix_commit and not any(word in commit_msg_lower for word in + ['prefix', 'suffix', 'affix', 'transfix', 'crucifix']): + if len(commit_hash) == 40: + fix_commit_hashes.append(commit_hash) + + print(f" Found {len(fix_commit_hashes)} fix commits") + + # For each fix commit, get the files that were changed (parallelized) + def get_commit_files(commit_hash: str) -> List[str]: + """Get Python files changed in a commit.""" + try: + file_result = subprocess.run( + ['git', 'show', '--name-only', '--pretty=format:', commit_hash], + cwd=repo_path, + capture_output=True, + text=True, + timeout=60 + ) + if file_result.returncode == 0: + return [f.strip() for f in file_result.stdout.strip().split('\n') + if f.strip() and f.strip().endswith('.py')] + except: + pass + return [] + + # Process commits in parallel + if fix_commit_hashes: + with ThreadPoolExecutor(max_workers=10) as executor: + futures = {executor.submit(get_commit_files, commit_hash): commit_hash + for commit_hash in fix_commit_hashes} + + for future in as_completed(futures): + commit_hash = futures[future] + try: + changed_files = future.result() + for file_path in changed_files: + if file_path not in file_fix_commits: + file_fix_commits[file_path] = set() + file_fix_commits[file_path].add(commit_hash) + except Exception as e: + print(f" Warning: Error processing commit {commit_hash[:8]}: {e}") + + except Exception as e: + print(f" Warning: Error analyzing commits: {e}") + + return file_fix_commits + + def count_fixes_per_file(self, repo_path: str, code_metrics: List[Dict]) -> None: + """ + Count fix commits for each file in code_metrics. + Updates the metrics dictionaries in place. + """ + print(" Analyzing git commit logs for semantic fix commits...") + file_fix_commits = self.get_fix_commits(repo_path) + + if not file_fix_commits: + print(" No fix commits found matching semantic commit formats") + # Set fix_count to 0 for all files + for metric in code_metrics: + metric['fix_count'] = 0 + metric['total_fixes'] = 0 + return + + repo_base = Path(repo_path) + + # Create a mapping from relative paths to fix counts + fix_counts: Dict[str, int] = {} + for file_path, commits in file_fix_commits.items(): + # Try to normalize the path + try: + abs_path = Path(repo_path) / file_path + if abs_path.exists(): + rel_path = abs_path.relative_to(repo_base) + fix_counts[str(rel_path)] = len(commits) + # Also store with forward slashes for matching + fix_counts[file_path] = len(commits) + except: + fix_counts[file_path] = len(commits) + + print(f" Found {len(fix_counts)} files with fix commits") + + # Match files to fix counts + for metric in tqdm(code_metrics, desc=" Matching files with fixes"): + relative_path = metric.get('relative_path', '') + filename = metric.get('filename', '') + file_path = metric.get('file_path', '') + + fix_count = 0 + + # Try multiple matching strategies + if relative_path in fix_counts: + fix_count = fix_counts[relative_path] + elif filename in fix_counts: + fix_count = fix_counts[filename] + else: + # Try matching by filename in the fix_counts keys + for fix_file, count in fix_counts.items(): + if filename in fix_file or relative_path in fix_file: + fix_count = max(fix_count, count) + + metric['fix_count'] = fix_count + metric['total_fixes'] = fix_count # Alias for consistency + + def collect_repository_data(self, owner: str, repo: str, parallel_files: bool = True, max_workers: Optional[int] = None) -> Optional[Dict]: + """Collect all data for a repository.""" + print(f"\nCollecting data for {owner}/{repo}...") + + # Clone repository + repo_path = self.clone_repository(owner, repo) + if not repo_path: + return None + + # Analyze code metrics (parallelized) + print(" Analyzing code metrics...") + code_metrics = self.code_analyzer.analyze_directory(repo_path, parallel=parallel_files, max_workers=max_workers) + + if not code_metrics: + print(f" No Python files found in {owner}/{repo}") + return None + + # Map file paths to relative paths for matching + repo_base = Path(repo_path) + for metric in code_metrics: + file_path = Path(metric['file_path']) + try: + relative_path = file_path.relative_to(repo_base) + metric['relative_path'] = str(relative_path).replace('\\', '/') + except: + metric['relative_path'] = metric['file_path'] + + # Count fix commits per file + self.count_fixes_per_file(repo_path, code_metrics) + + # Get total fix commits count + total_fixes = sum(metric.get('fix_count', 0) for metric in code_metrics) + + return { + 'owner': owner, + 'repo': repo, + 'code_metrics': code_metrics, + 'total_fixes': total_fixes + } + + def cleanup(self): + """Clean up temporary directories.""" + if self.temp_dir and os.path.exists(self.temp_dir): + shutil.rmtree(self.temp_dir) + self.temp_dir = None + diff --git a/github_client.py b/github_client.py new file mode 100644 index 0000000..153ca39 --- /dev/null +++ b/github_client.py @@ -0,0 +1,131 @@ +""" +GitHub API client for fetching repository data, issues, and pull requests. +""" +import requests +import time +from typing import List, Dict, Optional +from config import GITHUB_TOKEN, GITHUB_API_BASE + + +class GitHubClient: + """Client for interacting with GitHub API.""" + + def __init__(self, token: Optional[str] = None): + self.token = token or GITHUB_TOKEN + self.headers = { + 'Accept': 'application/vnd.github.v3+json', + } + if self.token: + self.headers['Authorization'] = f'token {self.token}' + self.session = requests.Session() + self.session.headers.update(self.headers) + + def _make_request(self, url: str, params: Optional[Dict] = None) -> Dict: + """Make a request to GitHub API with rate limiting.""" + response = self.session.get(url, params=params) + + # Handle rate limiting + if response.status_code == 403 and 'rate limit' in response.text.lower(): + reset_time = int(response.headers.get('X-RateLimit-Reset', time.time() + 60)) + wait_time = max(0, reset_time - int(time.time())) + print(f"Rate limited. Waiting {wait_time} seconds...") + time.sleep(wait_time) + response = self.session.get(url, params=params) + + response.raise_for_status() + return response.json() + + def search_repositories(self, query: str, sort: str = 'stars', order: str = 'desc', + per_page: int = 10) -> List[Dict]: + """Search for repositories matching the query.""" + url = f"{GITHUB_API_BASE}/search/repositories" + params = { + 'q': query, + 'sort': sort, + 'order': order, + 'per_page': per_page + } + results = self._make_request(url, params=params) + return results.get('items', []) + + def get_repository_info(self, owner: str, repo: str) -> Dict: + """Get detailed information about a repository.""" + url = f"{GITHUB_API_BASE}/repos/{owner}/{repo}" + return self._make_request(url) + + def get_repository_issues(self, owner: str, repo: str, state: str = 'all', + per_page: int = 100) -> List[Dict]: + """Get all issues for a repository.""" + url = f"{GITHUB_API_BASE}/repos/{owner}/{repo}/issues" + all_issues = [] + page = 1 + + while True: + params = { + 'state': state, + 'per_page': per_page, + 'page': page + } + issues = self._make_request(url, params=params) + + if not issues: + break + + # Filter out pull requests (they appear in issues endpoint) + issues = [issue for issue in issues if 'pull_request' not in issue] + all_issues.extend(issues) + + if len(issues) < per_page: + break + + page += 1 + time.sleep(0.5) # Be nice to the API + + return all_issues + + def get_repository_pulls(self, owner: str, repo: str, state: str = 'all', + per_page: int = 100) -> List[Dict]: + """Get all pull requests for a repository.""" + url = f"{GITHUB_API_BASE}/repos/{owner}/{repo}/pulls" + all_pulls = [] + page = 1 + + while True: + params = { + 'state': state, + 'per_page': per_page, + 'page': page + } + pulls = self._make_request(url, params=params) + + if not pulls: + break + + all_pulls.extend(pulls) + + if len(pulls) < per_page: + break + + page += 1 + time.sleep(0.5) + + return all_pulls + + def get_file_issues(self, owner: str, repo: str, file_path: str) -> Dict: + """Get issues and PRs related to a specific file.""" + # Search issues mentioning the file + query = f'repo:{owner}/{repo} {file_path}' + url = f"{GITHUB_API_BASE}/search/issues" + params = {'q': query, 'per_page': 100} + + try: + results = self._make_request(url, params=params) + return { + 'issues': len([item for item in results.get('items', []) + if 'pull_request' not in item]), + 'pulls': len([item for item in results.get('items', []) + if 'pull_request' in item]) + } + except: + return {'issues': 0, 'pulls': 0} + @@ -0,0 +1,438 @@ +""" +Main analysis script for code metrics analysis project. +Orchestrates data collection, analysis, and visualization. +""" +import pandas as pd +import numpy as np +import json +from pathlib import Path +from typing import Optional, Dict +from concurrent.futures import ThreadPoolExecutor, as_completed +from github_client import GitHubClient +from code_analyzer import CodeAnalyzer +from data_collector import DataCollector +from statistical_analysis import StatisticalAnalyzer +from visualizer import Visualizer +from config import ( + MAX_REPOSITORIES, MIN_STARS, EXCLUDE_DIRS, + OUTPUT_DIR, FIGURES_DIR, SIGNIFICANCE_LEVEL, CONFIDENCE_LEVEL, + CURATED_REPOSITORIES, MAX_WORKERS, PARALLEL_REPOS, PARALLEL_FILES, + USE_EXISTING_METRICS, RAW_METRICS_FILE, FOCUSED_MODE +) + + +def main(): + """Main analysis pipeline.""" + print("=" * 80) + print("Code Metrics Analysis Pipeline") + print("=" * 80) + + # Create output directories + Path(OUTPUT_DIR).mkdir(exist_ok=True) + Path(FIGURES_DIR).mkdir(exist_ok=True) + + # Initialize components + print("\n1. Initializing components...") + stat_analyzer = StatisticalAnalyzer( + significance_level=SIGNIFICANCE_LEVEL, + confidence_level=CONFIDENCE_LEVEL + ) + visualizer = Visualizer(output_dir=FIGURES_DIR) + + # Check if we should use existing metrics + raw_metrics_path = Path(RAW_METRICS_FILE) + use_existing = USE_EXISTING_METRICS and raw_metrics_path.exists() + + if use_existing: + print(f"\n2. Loading existing raw metrics from {RAW_METRICS_FILE}...") + try: + df = pd.read_csv(raw_metrics_path) + print(f" ✓ Loaded {len(df)} file metrics from existing data") + all_metrics = df.to_dict('records') + repo_summaries = [] # We don't have repo summaries from CSV + except Exception as e: + print(f" ✗ Error loading existing metrics: {e}") + print(" Falling back to data collection...") + use_existing = False + + if not use_existing: + # Initialize data collection components + github_client = GitHubClient() + code_analyzer = CodeAnalyzer(exclude_dirs=EXCLUDE_DIRS) + data_collector = DataCollector(code_analyzer) + + # Use curated list of popular Python repositories + print("\n2. Using curated list of popular Python repositories...") + repositories_to_analyze = CURATED_REPOSITORIES[:MAX_REPOSITORIES] + print(f"Selected {len(repositories_to_analyze)} repositories:") + for owner, repo_name in repositories_to_analyze: + print(f" - {owner}/{repo_name}") + + # Collect data from repositories (parallelized) + print("\n3. Collecting data from repositories...") + all_metrics = [] + repo_summaries = [] + + def process_repository(owner: str, repo_name: str) -> Optional[Dict]: + """Process a single repository and return results.""" + try: + print(f" Processing {owner}/{repo_name}...") + + # Get repository info for stars count + try: + repo_info = github_client.get_repository_info(owner, repo_name) + stars = repo_info.get('stargazers_count', 0) + except: + stars = 0 + + repo_data = data_collector.collect_repository_data( + owner, repo_name, + parallel_files=PARALLEL_FILES, + max_workers=MAX_WORKERS + ) + + if repo_data and repo_data['code_metrics']: + print(f" ✓ {owner}/{repo_name}: {len(repo_data['code_metrics'])} files, {repo_data['total_fixes']} fixes") + return { + 'metrics': repo_data['code_metrics'], + 'summary': { + 'owner': owner, + 'repo': repo_name, + 'files_analyzed': len(repo_data['code_metrics']), + 'total_fixes': repo_data['total_fixes'], + 'stars': stars + } + } + else: + print(f" ✗ {owner}/{repo_name}: No data collected") + return None + except Exception as e: + print(f" ✗ {owner}/{repo_name}: Error - {e}") + import traceback + traceback.print_exc() + return None + + if PARALLEL_REPOS and len(repositories_to_analyze) > 1: + # Process repositories in parallel + print(f" Processing {len(repositories_to_analyze)} repositories in parallel...") + with ThreadPoolExecutor(max_workers=min(len(repositories_to_analyze), MAX_WORKERS or 4)) as executor: + futures = {executor.submit(process_repository, owner, repo_name): (owner, repo_name) + for owner, repo_name in repositories_to_analyze} + + for future in as_completed(futures): + owner, repo_name = futures[future] + try: + result = future.result() + if result: + all_metrics.extend(result['metrics']) + repo_summaries.append(result['summary']) + except Exception as e: + print(f" ✗ {owner}/{repo_name}: Failed - {e}") + else: + # Sequential processing + for i, (owner, repo_name) in enumerate(repositories_to_analyze, 1): + print(f"\n[{i}/{len(repositories_to_analyze)}] Processing {owner}/{repo_name}...") + result = process_repository(owner, repo_name) + if result: + all_metrics.extend(result['metrics']) + repo_summaries.append(result['summary']) + + if not all_metrics: + print("\nNo metrics collected. Exiting.") + return + + # Convert to DataFrame + print("\n4. Preparing data for analysis...") + df = stat_analyzer.prepare_dataframe(all_metrics) + print(f"Total files analyzed: {len(df)}") + print(f"Columns: {list(df.columns)}") + + # Save raw data + df.to_csv(Path(OUTPUT_DIR) / 'raw_metrics.csv', index=False) + print(f" ✓ Saved raw metrics to {OUTPUT_DIR}/raw_metrics.csv") + else: + # Already have DataFrame from CSV + print("\n3. Preparing data for analysis...") + df = stat_analyzer.prepare_dataframe(all_metrics) + print(f"Total files analyzed: {len(df)}") + print(f"Columns: {list(df.columns)}") + + if len(df) == 0: + print("\nNo metrics available for analysis. Exiting.") + return + + # Statistical Analysis + step_num = "4" if use_existing else "5" + print(f"\n{step_num}. Performing statistical analysis...") + + if FOCUSED_MODE: + print(" [FOCUSED MODE: Regression and Hypothesis Testing Only]") + + # Regression analysis + print(" - Regression analysis...") + complexity_features = [ + 'loc', 'cyclomatic_complexity', 'cognitive_complexity', + 'max_complexity', 'avg_complexity', 'max_inheritance_depth' + ] + regression_results = stat_analyzer.regression_analysis( + df, complexity_features, target='fix_count' + ) + + # Hypothesis testing - t-tests + print(" - T-test analysis...") + t_test_results = stat_analyzer.t_test_analysis(df) + + # Hypothesis testing - z-tests + print(" - Z-test analysis...") + z_test_results = stat_analyzer.z_test_analysis(df) + + # Hypothesis testing - module comparisons (ANOVA) + print(" - Hypothesis testing (ANOVA)...") + hypothesis_results = stat_analyzer.hypothesis_testing(df) + + # Confidence intervals for key metrics + print(" - Confidence intervals...") + ci_results = stat_analyzer.confidence_intervals( + df, ['cyclomatic_complexity', 'cognitive_complexity', 'loc', 'fix_count'] + ) + + # Initialize empty results for skipped analyses + correlation_results = {} + var_cov_results = {} + pivot_results = {} + distribution_results = {} + else: + # Full analysis mode + # Correlation analysis + print(" - Correlation analysis...") + correlation_results = stat_analyzer.correlation_analysis(df) + + # Regression analysis + print(" - Regression analysis...") + complexity_features = [ + 'loc', 'cyclomatic_complexity', 'cognitive_complexity', + 'max_complexity', 'avg_complexity', 'max_inheritance_depth' + ] + regression_results = stat_analyzer.regression_analysis( + df, complexity_features, target='fix_count' + ) + + # Hypothesis testing + print(" - Hypothesis testing...") + hypothesis_results = stat_analyzer.hypothesis_testing(df) + + # T-test analysis + print(" - T-test analysis...") + t_test_results = stat_analyzer.t_test_analysis(df) + + # Z-test analysis + print(" - Z-test analysis...") + z_test_results = stat_analyzer.z_test_analysis(df) + + # Confidence intervals + print(" - Confidence intervals...") + ci_results = stat_analyzer.confidence_intervals( + df, ['cyclomatic_complexity', 'cognitive_complexity', 'loc', 'fix_count'] + ) + + # Variance-covariance analysis + print(" - Variance-covariance analysis...") + var_cov_results = stat_analyzer.variance_covariance_analysis(df) + + # Pivot table analysis + print(" - Pivot table analysis...") + pivot_results = stat_analyzer.pivot_table_analysis(df) + + # Discrete distribution analysis + print(" - Discrete distribution analysis...") + distribution_results = stat_analyzer.discrete_distribution_analysis(df) + + # Save analysis results + step_num = "5" if use_existing else "6" + print(f"\n{step_num}. Saving analysis results...") + results = { + 'correlation_analysis': correlation_results, + 'regression_analysis': regression_results, + 'hypothesis_testing': hypothesis_results, + 't_test_analysis': t_test_results if 't_test_results' in locals() else {}, + 'z_test_analysis': z_test_results if 'z_test_results' in locals() else {}, + 'confidence_intervals': ci_results, + 'variance_covariance': var_cov_results, + 'pivot_tables': {k: v.to_dict() if isinstance(v, pd.DataFrame) else v + for k, v in pivot_results.items()}, + 'distribution_analysis': distribution_results, + 'repository_summaries': repo_summaries, + 'analysis_mode': 'focused' if FOCUSED_MODE else 'full' + } + + # Convert numpy types to native Python types for JSON serialization + def convert_to_serializable(obj): + if isinstance(obj, (np.integer, np.floating)): + return float(obj) + elif isinstance(obj, np.ndarray): + return obj.tolist() + elif isinstance(obj, dict): + return {k: convert_to_serializable(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [convert_to_serializable(item) for item in obj] + elif isinstance(obj, pd.DataFrame): + return obj.to_dict() + return obj + + results_serializable = convert_to_serializable(results) + + with open(Path(OUTPUT_DIR) / 'analysis_results.json', 'w') as f: + json.dump(results_serializable, f, indent=2, default=str) + print(f" ✓ Saved analysis results to {OUTPUT_DIR}/analysis_results.json") + + # Create visualizations + print("\n" + ("6" if use_existing else "7") + ". Creating visualizations...") + + if FOCUSED_MODE: + print(" [FOCUSED MODE: Regression visualizations only]") + + # Scatter plots for key relationships + print(" - Scatter plots...") + visualizer.plot_complexity_vs_issues_scatter( + df, 'loc', 'fix_count', + 'loc_vs_fixes.png' + ) + visualizer.plot_complexity_vs_issues_scatter( + df, 'cognitive_complexity', 'fix_count', + 'cognitive_complexity_vs_fixes.png' + ) + + # Regression results + if regression_results: + print(" - Regression results...") + visualizer.plot_regression_results(regression_results) + else: + # Full visualization mode + # Correlation heatmap + print(" - Correlation heatmap...") + visualizer.plot_correlation_heatmap( + df, + complexity_cols=['loc', 'cyclomatic_complexity', 'cognitive_complexity', + 'max_complexity', 'avg_complexity'], + issue_cols=['fix_count', 'total_fixes'] + ) + + # Scatter plots + print(" - Scatter plots...") + visualizer.plot_complexity_vs_issues_scatter( + df, 'cyclomatic_complexity', 'fix_count', + 'cyclomatic_complexity_vs_fixes.png' + ) + visualizer.plot_complexity_vs_issues_scatter( + df, 'cognitive_complexity', 'fix_count', + 'cognitive_complexity_vs_fixes.png' + ) + visualizer.plot_complexity_vs_issues_scatter( + df, 'loc', 'fix_count', + 'loc_vs_fixes.png' + ) + + # Module comparison + print(" - Module comparison...") + visualizer.plot_module_complexity_comparison( + df, 'cyclomatic_complexity' + ) + + # Distribution analysis + print(" - Distribution analysis...") + visualizer.plot_distribution_analysis(df, 'cyclomatic_complexity') + visualizer.plot_distribution_analysis(df, 'fix_count', + 'fix_distribution.png') + + # Regression results + if regression_results: + print(" - Regression results...") + visualizer.plot_regression_results(regression_results) + + print(f" ✓ Saved visualizations to {FIGURES_DIR}/") + + # Print summary statistics + step_num = "7" if use_existing else "8" + print(f"\n{step_num}. Summary Statistics") + print("=" * 80) + print(f"\nTotal files analyzed: {len(df)}") + if repo_summaries: + print(f"Total repositories: {len(repo_summaries)}") + + print("\nComplexity Metrics (Mean ± Std):") + for metric in ['loc', 'cyclomatic_complexity', 'cognitive_complexity', + 'max_complexity', 'avg_complexity']: + if metric in df.columns: + mean_val = df[metric].mean() + std_val = df[metric].std() + print(f" {metric:30s}: {mean_val:8.2f} ± {std_val:8.2f}") + + print("\nFix Metrics (Mean ± Std):") + for metric in ['fix_count', 'total_fixes']: + if metric in df.columns: + mean_val = df[metric].mean() + std_val = df[metric].std() + print(f" {metric:30s}: {mean_val:8.2f} ± {std_val:8.2f}") + + if not FOCUSED_MODE: + print("\nSignificant Correlations:") + sig_corr = correlation_results.get('significant_correlations', {}) + if sig_corr: + for metric, corr_value in list(sig_corr.items())[:10]: + print(f" {metric}: {corr_value:.3f}") + else: + print(" None found") + + if regression_results: + print(f"\nRegression Analysis:") + print(f" R²: {regression_results.get('r_squared', 0):.3f}") + print(f" RMSE: {regression_results.get('rmse', 0):.3f}") + if regression_results.get('significant_features'): + print(f" Significant features: {regression_results['significant_features']}") + + # T-test results + if 't_test_results' in locals() and t_test_results: + print(f"\nT-Test Results (High-fix vs Low-fix files):") + for metric, result in t_test_results.items(): + if 'error' not in result: + sig_marker = "***" if result.get('significant') else "" + print(f" {metric}:") + print(f" t-statistic: {result.get('t_statistic', 0):.3f}") + print(f" p-value: {result.get('p_value', 1):.4f} {sig_marker}") + print(f" High-fix mean: {result.get('high_fix_mean', 0):.2f}") + print(f" Low-fix mean: {result.get('low_fix_mean', 0):.2f}") + + # Z-test results + if 'z_test_results' in locals() and z_test_results: + print(f"\nZ-Test Results (High-fix vs Low-fix files):") + for metric, result in z_test_results.items(): + if 'error' not in result: + sig_marker = "***" if result.get('significant') else "" + print(f" {metric}:") + print(f" z-statistic: {result.get('z_statistic', 0):.3f}") + print(f" p-value: {result.get('p_value', 1):.4f} {sig_marker}") + print(f" Mean difference: {result.get('mean_difference', 0):.2f}") + print(f" 95% CI: [{result.get('ci_lower', 0):.2f}, {result.get('ci_upper', 0):.2f}]") + + # Hypothesis testing results + if hypothesis_results: + print(f"\nHypothesis Testing (ANOVA/Kruskal-Wallis):") + for test_name, result in list(hypothesis_results.items())[:5]: + if isinstance(result, dict) and 'p_value' in result: + sig_marker = "***" if result.get('significant') else "" + print(f" {test_name}: p={result.get('p_value', 1):.4f} {sig_marker}") + + print("\n" + "=" * 80) + print("Analysis complete!") + print(f"Results saved to: {OUTPUT_DIR}/") + print(f"Figures saved to: {FIGURES_DIR}/") + print("=" * 80) + + # Cleanup (only if we collected data) + if not use_existing: + data_collector.cleanup() + + +if __name__ == '__main__': + main() + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..e5b3df3 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,13 @@ +requests>=2.31.0 +radon>=6.0.1 +lizard>=1.17.10 +pandas>=2.0.0 +numpy>=1.24.0 +scipy>=1.11.0 +matplotlib>=3.7.0 +seaborn>=0.12.0 +scikit-learn>=1.3.0 +PyGithub>=1.59.0 +tqdm>=4.65.0 +python-dotenv>=1.0.0 + diff --git a/statistical_analysis.py b/statistical_analysis.py new file mode 100644 index 0000000..2a50f4e --- /dev/null +++ b/statistical_analysis.py @@ -0,0 +1,553 @@ +""" +Statistical analysis module for code metrics and issue data. +""" +import pandas as pd +import numpy as np +from scipy import stats +from scipy.stats import pearsonr, spearmanr, chi2_contingency +from sklearn.linear_model import LinearRegression +from sklearn.preprocessing import StandardScaler +from sklearn.feature_selection import VarianceThreshold +from typing import Dict, List, Tuple +import warnings +warnings.filterwarnings('ignore') + + +class StatisticalAnalyzer: + """Performs statistical analysis on code metrics data.""" + + def __init__(self, significance_level: float = 0.05, confidence_level: float = 0.95): + self.significance_level = significance_level + self.confidence_level = confidence_level + + def prepare_dataframe(self, data: List[Dict]) -> pd.DataFrame: + """Convert list of metrics dictionaries to DataFrame.""" + df = pd.DataFrame(data) + return df + + def correlation_analysis(self, df: pd.DataFrame) -> Dict: + """Perform correlation analysis between complexity metrics and issues.""" + results = {} + + # Select numeric columns for correlation + complexity_metrics = [ + 'loc', 'lloc', 'sloc', 'cyclomatic_complexity', + 'cognitive_complexity', 'max_complexity', 'avg_complexity', + 'max_inheritance_depth', 'maintainability_index' + ] + + issue_metrics = ['fix_count', 'total_fixes'] + + # Filter to columns that exist + complexity_cols = [col for col in complexity_metrics if col in df.columns] + issue_cols = [col for col in issue_metrics if col in df.columns] + + correlations = {} + p_values = {} + + for comp_col in complexity_cols: + for issue_col in issue_cols: + # Remove NaN values + mask = df[[comp_col, issue_col]].notna().all(axis=1) + if mask.sum() < 3: # Need at least 3 data points + continue + + x = df.loc[mask, comp_col] + y = df.loc[mask, issue_col] + + # Pearson correlation + pearson_r, pearson_p = pearsonr(x, y) + correlations[f'{comp_col}_vs_{issue_col}_pearson'] = pearson_r + p_values[f'{comp_col}_vs_{issue_col}_pearson'] = pearson_p + + # Spearman correlation (non-parametric) + spearman_r, spearman_p = spearmanr(x, y) + correlations[f'{comp_col}_vs_{issue_col}_spearman'] = spearman_r + p_values[f'{comp_col}_vs_{issue_col}_spearman'] = spearman_p + + results['correlations'] = correlations + results['p_values'] = p_values + results['significant_correlations'] = { + k: v for k, v in correlations.items() + if p_values.get(k.replace('_pearson', '_pearson').replace('_spearman', '_spearman'), 1) < self.significance_level + } + + return results + + def regression_analysis(self, df: pd.DataFrame, + complexity_features: List[str], + target: str = 'fix_count') -> Dict: + """Perform regression analysis to predict fix count from complexity.""" + results = {} + + # Prepare features + feature_cols = [col for col in complexity_features if col in df.columns] + if not feature_cols: + return results + + # Remove rows with missing values + mask = df[feature_cols + [target]].notna().all(axis=1) + if mask.sum() < len(feature_cols) + 1: + return results + + X = df.loc[mask, feature_cols] + y = df.loc[mask, target] + + # Check for multicollinearity - remove highly correlated features + if len(feature_cols) > 1: + corr_matrix = X.corr().abs() + upper_triangle = corr_matrix.where( + np.triu(np.ones(corr_matrix.shape), k=1).astype(bool) + ) + # Find features with correlation > 0.95 + high_corr_features = [column for column in upper_triangle.columns + if any(upper_triangle[column] > 0.95)] + if high_corr_features: + # Keep the first feature, remove others + features_to_remove = high_corr_features + feature_cols = [f for f in feature_cols if f not in features_to_remove] + X = X[feature_cols] + + if len(feature_cols) == 0: + return results + + # Standardize features + scaler = StandardScaler() + X_scaled = scaler.fit_transform(X) + + # Fit linear regression + model = LinearRegression() + model.fit(X_scaled, y) + + # Predictions + y_pred = model.predict(X_scaled) + + # Calculate metrics + r_squared = model.score(X_scaled, y) + mse = np.mean((y - y_pred) ** 2) + rmse = np.sqrt(mse) + + # Coefficients + coefficients = dict(zip(feature_cols, model.coef_)) + intercept = model.intercept_ + + # Confidence intervals for coefficients + n = len(y) + p = len(feature_cols) + residuals = y - y_pred + mse_residual = np.sum(residuals ** 2) / (n - p - 1) + + # Standard errors - handle singular matrix + X_with_intercept = np.column_stack([np.ones(n), X_scaled]) + XTX = X_with_intercept.T @ X_with_intercept + + try: + # Check if matrix is singular or near-singular + if np.linalg.cond(XTX) > 1e12: + # Use pseudo-inverse for near-singular matrices + cov_matrix = mse_residual * np.linalg.pinv(XTX) + else: + cov_matrix = mse_residual * np.linalg.inv(XTX) + std_errors = np.sqrt(np.diag(cov_matrix))[1:] # Skip intercept + + # t-statistics and p-values + t_stats = model.coef_ / std_errors + p_values = 2 * (1 - stats.t.cdf(np.abs(t_stats), n - p - 1)) + + # Confidence intervals + alpha = 1 - self.confidence_level + t_critical = stats.t.ppf(1 - alpha/2, n - p - 1) + ci_lower = model.coef_ - t_critical * std_errors + ci_upper = model.coef_ + t_critical * std_errors + except (np.linalg.LinAlgError, ValueError): + # If still singular, use pseudo-inverse + try: + cov_matrix = mse_residual * np.linalg.pinv(XTX) + std_errors = np.sqrt(np.diag(cov_matrix))[1:] + # Handle potential NaN values + std_errors = np.where(np.isnan(std_errors) | (std_errors == 0), + np.inf, std_errors) + t_stats = model.coef_ / std_errors + p_values = np.where(np.isfinite(t_stats), + 2 * (1 - stats.t.cdf(np.abs(t_stats), n - p - 1)), + np.nan) + alpha = 1 - self.confidence_level + t_critical = stats.t.ppf(1 - alpha/2, n - p - 1) + ci_lower = model.coef_ - t_critical * std_errors + ci_upper = model.coef_ + t_critical * std_errors + except: + # If all else fails, set defaults + std_errors = np.full(len(feature_cols), np.nan) + p_values = np.full(len(feature_cols), np.nan) + ci_lower = np.full(len(feature_cols), np.nan) + ci_upper = np.full(len(feature_cols), np.nan) + + results['r_squared'] = r_squared + results['rmse'] = rmse + results['coefficients'] = coefficients + results['intercept'] = intercept + results['p_values'] = dict(zip(feature_cols, p_values)) + results['confidence_intervals'] = { + col: (lower, upper) for col, lower, upper in + zip(feature_cols, ci_lower, ci_upper) + } + results['significant_features'] = [ + col for col, p_val in zip(feature_cols, p_values) + if p_val < self.significance_level + ] + + return results + + def hypothesis_testing(self, df: pd.DataFrame) -> Dict: + """Perform hypothesis tests comparing complexity across modules.""" + results = {} + + if 'module' not in df.columns: + return results + + # Test: Do different modules have significantly different complexity? + modules = df['module'].unique() + if len(modules) < 2: + return results + + complexity_metrics = [ + 'cyclomatic_complexity', 'cognitive_complexity', + 'avg_complexity', 'loc' + ] + + for metric in complexity_metrics: + if metric not in df.columns: + continue + + # Remove NaN values + data_by_module = [ + df[df['module'] == module][metric].dropna().values + for module in modules + if len(df[df['module'] == module][metric].dropna()) > 0 + ] + + if len(data_by_module) < 2: + continue + + # One-way ANOVA + try: + f_stat, p_value = stats.f_oneway(*data_by_module) + results[f'{metric}_anova'] = { + 'f_statistic': float(f_stat), + 'p_value': float(p_value), + 'significant': p_value < self.significance_level + } + except: + pass + + # Kruskal-Wallis (non-parametric alternative) + try: + h_stat, p_value_kw = stats.kruskal(*data_by_module) + results[f'{metric}_kruskal_wallis'] = { + 'h_statistic': float(h_stat), + 'p_value': float(p_value_kw), + 'significant': p_value_kw < self.significance_level + } + except: + pass + + return results + + def t_test_analysis(self, df: pd.DataFrame) -> Dict: + """ + Perform t-tests to compare complexity metrics between high-fix and low-fix files. + """ + results = {} + + if 'fix_count' not in df.columns: + return results + + # Split files into high-fix and low-fix groups + median_fixes = df['fix_count'].median() + high_fix_mask = df['fix_count'] > median_fixes + low_fix_mask = df['fix_count'] <= median_fixes + + complexity_metrics = [ + 'loc', 'cyclomatic_complexity', 'cognitive_complexity', + 'avg_complexity', 'max_complexity', 'max_inheritance_depth' + ] + + for metric in complexity_metrics: + if metric not in df.columns: + continue + + high_fix_data = df.loc[high_fix_mask, metric].dropna() + low_fix_data = df.loc[low_fix_mask, metric].dropna() + + if len(high_fix_data) < 2 or len(low_fix_data) < 2: + continue + + # Independent samples t-test (assuming unequal variances) + try: + t_stat, p_value = stats.ttest_ind(high_fix_data, low_fix_data, + equal_var=False) + results[f'{metric}_t_test'] = { + 't_statistic': float(t_stat), + 'p_value': float(p_value), + 'significant': p_value < self.significance_level, + 'high_fix_mean': float(high_fix_data.mean()), + 'low_fix_mean': float(low_fix_data.mean()), + 'high_fix_std': float(high_fix_data.std()), + 'low_fix_std': float(low_fix_data.std()), + 'high_fix_n': len(high_fix_data), + 'low_fix_n': len(low_fix_data) + } + except Exception as e: + results[f'{metric}_t_test'] = {'error': str(e)} + + return results + + def z_test_analysis(self, df: pd.DataFrame) -> Dict: + """ + Perform z-tests to compare complexity metrics between high-fix and low-fix files. + Z-test assumes known population variance (uses sample variance as approximation). + """ + results = {} + + if 'fix_count' not in df.columns: + return results + + # Split files into high-fix and low-fix groups + median_fixes = df['fix_count'].median() + high_fix_mask = df['fix_count'] > median_fixes + low_fix_mask = df['fix_count'] <= median_fixes + + complexity_metrics = [ + 'loc', 'cyclomatic_complexity', 'cognitive_complexity', + 'avg_complexity', 'max_complexity', 'max_inheritance_depth' + ] + + for metric in complexity_metrics: + if metric not in df.columns: + continue + + high_fix_data = df.loc[high_fix_mask, metric].dropna() + low_fix_data = df.loc[low_fix_mask, metric].dropna() + + if len(high_fix_data) < 30 or len(low_fix_data) < 30: + # Z-test requires large sample sizes (n >= 30) + continue + + try: + # Calculate means and standard errors + mean1 = high_fix_data.mean() + mean2 = low_fix_data.mean() + std1 = high_fix_data.std() + std2 = low_fix_data.std() + n1 = len(high_fix_data) + n2 = len(low_fix_data) + + # Standard error of the difference + se_diff = np.sqrt((std1**2 / n1) + (std2**2 / n2)) + + # Z-statistic + z_stat = (mean1 - mean2) / se_diff + + # Two-tailed p-value + p_value = 2 * (1 - stats.norm.cdf(abs(z_stat))) + + # Confidence interval for difference + alpha = 1 - self.confidence_level + z_critical = stats.norm.ppf(1 - alpha/2) + ci_lower = (mean1 - mean2) - z_critical * se_diff + ci_upper = (mean1 - mean2) + z_critical * se_diff + + results[f'{metric}_z_test'] = { + 'z_statistic': float(z_stat), + 'p_value': float(p_value), + 'significant': p_value < self.significance_level, + 'high_fix_mean': float(mean1), + 'low_fix_mean': float(mean2), + 'mean_difference': float(mean1 - mean2), + 'ci_lower': float(ci_lower), + 'ci_upper': float(ci_upper), + 'high_fix_n': n1, + 'low_fix_n': n2 + } + except Exception as e: + results[f'{metric}_z_test'] = {'error': str(e)} + + return results + + def confidence_intervals(self, df: pd.DataFrame, + metrics: List[str]) -> Dict: + """Calculate confidence intervals for various metrics.""" + results = {} + + alpha = 1 - self.confidence_level + + for metric in metrics: + if metric not in df.columns: + continue + + data = df[metric].dropna() + if len(data) < 2: + continue + + # Calculate mean and standard error + mean = data.mean() + std_err = stats.sem(data) + + # t-distribution confidence interval + t_critical = stats.t.ppf(1 - alpha/2, len(data) - 1) + ci_lower = mean - t_critical * std_err + ci_upper = mean + t_critical * std_err + + results[metric] = { + 'mean': mean, + 'std': data.std(), + 'ci_lower': ci_lower, + 'ci_upper': ci_upper, + 'confidence_level': self.confidence_level + } + + return results + + def variance_covariance_analysis(self, df: pd.DataFrame) -> Dict: + """Calculate variance-covariance matrix for complexity metrics.""" + results = {} + + complexity_metrics = [ + 'loc', 'cyclomatic_complexity', 'cognitive_complexity', + 'max_complexity', 'avg_complexity', 'max_inheritance_depth' + ] + + metric_cols = [col for col in complexity_metrics if col in df.columns] + + if len(metric_cols) < 2: + return results + + # Remove rows with missing values + data = df[metric_cols].dropna() + + if len(data) < 2: + return results + + # Calculate covariance matrix + cov_matrix = data.cov() + corr_matrix = data.corr() + + results['covariance_matrix'] = cov_matrix + results['correlation_matrix'] = corr_matrix + results['variances'] = data.var().to_dict() + + return results + + def pivot_table_analysis(self, df: pd.DataFrame) -> Dict: + """Create pivot tables for cross-tabulation analysis.""" + results = {} + + if 'module' not in df.columns: + return results + + # Create complexity categories + if 'cyclomatic_complexity' in df.columns: + df['complexity_category'] = pd.cut( + df['cyclomatic_complexity'], + bins=[0, 10, 25, 50, float('inf')], + labels=['Low', 'Medium', 'High', 'Very High'] + ) + + # Pivot: Module vs Complexity Category + pivot = pd.crosstab(df['module'], df['complexity_category'], + values=df['cyclomatic_complexity'], + aggfunc='mean') + results['module_complexity_pivot'] = pivot + + # Pivot: Module vs Fix Count + if 'fix_count' in df.columns: + pivot_fixes = pd.crosstab( + df['module'], + pd.cut(df['fix_count'], + bins=[0, 1, 5, 10, float('inf')], + labels=['None', 'Low', 'Medium', 'High']), + values=df['fix_count'], + aggfunc='mean' + ) + results['module_fixes_pivot'] = pivot_fixes + + return results + + def discrete_distribution_analysis(self, df: pd.DataFrame) -> Dict: + """Analyze discrete distributions of fix counts.""" + results = {} + + if 'fix_count' not in df.columns: + return results + + issue_counts = df['fix_count'].dropna() + + # Fit Poisson distribution + lambda_poisson = issue_counts.mean() + poisson_dist = stats.poisson(lambda_poisson) + + # Chi-square goodness of fit test + observed_freq = issue_counts.value_counts().sort_index() + max_observed = int(observed_freq.index.max()) + + # Create bins for chi-square test + # Use bins that ensure expected frequency >= 5 + observed_array = [] + expected_array = [] + + # Start from 0 and go up to max_observed + for k in range(max_observed + 1): + obs_count = observed_freq.get(k, 0) + exp_count = poisson_dist.pmf(k) * len(issue_counts) + + # Only include if expected frequency >= 5 + if exp_count >= 5: + observed_array.append(obs_count) + expected_array.append(exp_count) + + # If we have bins, perform the test + if len(observed_array) > 0 and len(expected_array) > 0: + # Normalize expected frequencies to match observed sum + observed_sum = sum(observed_array) + expected_sum = sum(expected_array) + + if expected_sum > 0: + # Scale expected frequencies to match observed sum + expected_array = np.array(expected_array) * (observed_sum / expected_sum) + observed_array = np.array(observed_array) + + # Ensure sums match (within tolerance) + if abs(sum(observed_array) - sum(expected_array)) < 1e-6: + try: + chi2_stat, p_value = stats.chisquare( + observed_array, + expected_array + ) + + results['poisson_fit'] = { + 'lambda': lambda_poisson, + 'chi2_statistic': float(chi2_stat), + 'p_value': float(p_value), + 'fits': p_value >= self.significance_level + } + except (ValueError, RuntimeError) as e: + # If chi-square test fails, skip it + results['poisson_fit'] = { + 'lambda': lambda_poisson, + 'chi2_statistic': None, + 'p_value': None, + 'fits': None, + 'error': str(e) + } + + # Summary statistics + results['distribution_summary'] = { + 'mean': issue_counts.mean(), + 'variance': issue_counts.var(), + 'std': issue_counts.std(), + 'skewness': stats.skew(issue_counts), + 'kurtosis': stats.kurtosis(issue_counts) + } + + return results + diff --git a/visualizer.py b/visualizer.py new file mode 100644 index 0000000..6608037 --- /dev/null +++ b/visualizer.py @@ -0,0 +1,255 @@ +""" +Visualization module for code metrics analysis results. +""" +import matplotlib.pyplot as plt +import seaborn as sns +import pandas as pd +import numpy as np +from pathlib import Path +from typing import Dict, List, Optional +import os + + +class Visualizer: + """Creates visualizations for code metrics analysis.""" + + def __init__(self, output_dir: str = 'figures'): + self.output_dir = Path(output_dir) + self.output_dir.mkdir(exist_ok=True) + sns.set_style("whitegrid") + plt.rcParams['figure.figsize'] = (12, 8) + + def plot_correlation_heatmap(self, df: pd.DataFrame, + complexity_cols: List[str], + issue_cols: List[str], + filename: str = 'correlation_heatmap.png'): + """Create correlation heatmap between complexity and issues.""" + # Filter to existing columns + comp_cols = [col for col in complexity_cols if col in df.columns] + iss_cols = [col for col in issue_cols if col in df.columns] + + if not comp_cols or not iss_cols: + return + + # Calculate correlation matrix + corr_matrix = np.full((len(comp_cols), len(iss_cols)), np.nan) + + for i, comp_col in enumerate(comp_cols): + for j, iss_col in enumerate(iss_cols): + mask = df[[comp_col, iss_col]].notna().all(axis=1) + if mask.sum() >= 3: + corr = df.loc[mask, comp_col].corr(df.loc[mask, iss_col], method='pearson') + if not np.isnan(corr): + corr_matrix[i, j] = corr + + # Check if we have any valid correlations + if np.isnan(corr_matrix).all(): + return + + # Create heatmap + fig, ax = plt.subplots(figsize=(14, 10)) + sns.heatmap(corr_matrix, annot=True, fmt='.3f', cmap='coolwarm', + center=0, vmin=-1, vmax=1, ax=ax, + xticklabels=iss_cols, yticklabels=comp_cols, + mask=np.isnan(corr_matrix)) + ax.set_title('Correlation Heatmap: Complexity Metrics vs Issue Metrics', + fontsize=16, fontweight='bold') + plt.tight_layout() + plt.savefig(self.output_dir / filename, dpi=300, bbox_inches='tight') + plt.close() + + def plot_complexity_vs_issues_scatter(self, df: pd.DataFrame, + complexity_col: str, + issue_col: str, + filename: Optional[str] = None): + """Create scatter plot of complexity vs issues.""" + if complexity_col not in df.columns or issue_col not in df.columns: + return + + mask = df[[complexity_col, issue_col]].notna().all(axis=1) + if mask.sum() < 3: + return + + fig, ax = plt.subplots(figsize=(10, 6)) + + x = df.loc[mask, complexity_col] + y = df.loc[mask, issue_col] + + # Remove any infinite or NaN values + valid_mask = np.isfinite(x) & np.isfinite(y) + x = x[valid_mask] + y = y[valid_mask] + + if len(x) < 2: + return + + ax.scatter(x, y, alpha=0.5, s=50) + + # Add regression line (with error handling) + try: + # Check if x has variance (not constant) + if x.std() > 1e-10: + z = np.polyfit(x, y, 1) + p = np.poly1d(z) + x_sorted = np.sort(x) + ax.plot(x_sorted, p(x_sorted), "r--", alpha=0.8, linewidth=2, + label=f'Trend line (slope={z[0]:.3f})') + except (np.linalg.LinAlgError, ValueError, RuntimeError): + # Skip regression line if fitting fails + pass + + ax.set_xlabel(complexity_col.replace('_', ' ').title(), fontsize=12) + ax.set_ylabel(issue_col.replace('_', ' ').title(), fontsize=12) + ax.set_title(f'{complexity_col.replace("_", " ").title()} vs ' + f'{issue_col.replace("_", " ").title()}', + fontsize=14, fontweight='bold') + ax.legend() + ax.grid(True, alpha=0.3) + + plt.tight_layout() + if filename: + plt.savefig(self.output_dir / filename, dpi=300, bbox_inches='tight') + plt.close() + + def plot_module_complexity_comparison(self, df: pd.DataFrame, + complexity_col: str, + filename: str = 'module_complexity_comparison.png'): + """Compare complexity across different modules.""" + if 'module' not in df.columns or complexity_col not in df.columns: + return + + mask = df[complexity_col].notna() + if mask.sum() == 0: + return + + fig, axes = plt.subplots(2, 1, figsize=(14, 10)) + + # Box plot + modules = df.loc[mask, 'module'].unique() + data_by_module = [df.loc[mask & (df['module'] == mod), complexity_col].values + for mod in modules] + + axes[0].boxplot(data_by_module, labels=modules) + axes[0].set_ylabel(complexity_col.replace('_', ' ').title(), fontsize=12) + axes[0].set_title(f'{complexity_col.replace("_", " ").title()} by Module', + fontsize=14, fontweight='bold') + axes[0].tick_params(axis='x', rotation=45) + axes[0].grid(True, alpha=0.3) + + # Bar plot of means + module_means = df.loc[mask].groupby('module')[complexity_col].mean().sort_values(ascending=False) + axes[1].bar(range(len(module_means)), module_means.values) + axes[1].set_xticks(range(len(module_means))) + axes[1].set_xticklabels(module_means.index, rotation=45, ha='right') + axes[1].set_ylabel(f'Mean {complexity_col.replace("_", " ").title()}', fontsize=12) + axes[1].set_title(f'Average {complexity_col.replace("_", " ").title()} by Module', + fontsize=14, fontweight='bold') + axes[1].grid(True, alpha=0.3, axis='y') + + plt.tight_layout() + plt.savefig(self.output_dir / filename, dpi=300, bbox_inches='tight') + plt.close() + + def plot_distribution_analysis(self, df: pd.DataFrame, + metric_col: str, + filename: str = 'distribution_analysis.png'): + """Plot distribution analysis for a metric.""" + if metric_col not in df.columns: + return + + data = df[metric_col].dropna() + if len(data) == 0: + return + + fig, axes = plt.subplots(2, 2, figsize=(14, 10)) + + # Histogram + axes[0, 0].hist(data, bins=30, edgecolor='black', alpha=0.7) + axes[0, 0].set_xlabel(metric_col.replace('_', ' ').title(), fontsize=12) + axes[0, 0].set_ylabel('Frequency', fontsize=12) + axes[0, 0].set_title('Histogram', fontsize=12, fontweight='bold') + axes[0, 0].grid(True, alpha=0.3) + + # Q-Q plot + from scipy import stats + stats.probplot(data, dist="norm", plot=axes[0, 1]) + axes[0, 1].set_title('Q-Q Plot (Normal Distribution)', fontsize=12, fontweight='bold') + axes[0, 1].grid(True, alpha=0.3) + + # Box plot + axes[1, 0].boxplot(data, vert=True) + axes[1, 0].set_ylabel(metric_col.replace('_', ' ').title(), fontsize=12) + axes[1, 0].set_title('Box Plot', fontsize=12, fontweight='bold') + axes[1, 0].grid(True, alpha=0.3) + + # Cumulative distribution + sorted_data = np.sort(data) + cumulative = np.arange(1, len(sorted_data) + 1) / len(sorted_data) + axes[1, 1].plot(sorted_data, cumulative, linewidth=2) + axes[1, 1].set_xlabel(metric_col.replace('_', ' ').title(), fontsize=12) + axes[1, 1].set_ylabel('Cumulative Probability', fontsize=12) + axes[1, 1].set_title('Cumulative Distribution Function', fontsize=12, fontweight='bold') + axes[1, 1].grid(True, alpha=0.3) + + plt.suptitle(f'Distribution Analysis: {metric_col.replace("_", " ").title()}', + fontsize=16, fontweight='bold') + plt.tight_layout() + plt.savefig(self.output_dir / filename, dpi=300, bbox_inches='tight') + plt.close() + + def plot_regression_results(self, regression_results: Dict, + filename: str = 'regression_results.png'): + """Visualize regression analysis results.""" + if 'coefficients' not in regression_results: + return + + coefficients = regression_results['coefficients'] + p_values = regression_results.get('p_values', {}) + ci = regression_results.get('confidence_intervals', {}) + + if not coefficients: + return + + fig, axes = plt.subplots(1, 2, figsize=(14, 6)) + + # Coefficient plot with confidence intervals + features = list(coefficients.keys()) + coef_values = list(coefficients.values()) + colors = ['red' if p_values.get(f, 1) < 0.05 else 'gray' + for f in features] + + y_pos = np.arange(len(features)) + axes[0].barh(y_pos, coef_values, color=colors, alpha=0.7) + + # Add confidence intervals + for i, feature in enumerate(features): + if feature in ci: + ci_lower, ci_upper = ci[feature] + axes[0].plot([ci_lower, ci_upper], [i, i], 'k-', linewidth=2) + + axes[0].set_yticks(y_pos) + axes[0].set_yticklabels(features) + axes[0].set_xlabel('Coefficient Value', fontsize=12) + axes[0].set_title('Regression Coefficients with 95% CI', fontsize=14, fontweight='bold') + axes[0].axvline(x=0, color='black', linestyle='--', linewidth=1) + axes[0].grid(True, alpha=0.3) + + # P-values plot + p_vals = [p_values.get(f, 1) for f in features] + axes[1].barh(y_pos, p_vals, color=colors, alpha=0.7) + axes[1].axvline(x=0.05, color='red', linestyle='--', linewidth=2, + label='Significance Level (0.05)') + axes[1].set_yticks(y_pos) + axes[1].set_yticklabels(features) + axes[1].set_xlabel('P-value', fontsize=12) + axes[1].set_title('P-values for Coefficients', fontsize=14, fontweight='bold') + axes[1].set_xlim([0, max(p_vals) * 1.1]) + axes[1].legend() + axes[1].grid(True, alpha=0.3) + + plt.suptitle(f'Regression Analysis Results (R² = {regression_results.get("r_squared", 0):.3f})', + fontsize=16, fontweight='bold') + plt.tight_layout() + plt.savefig(self.output_dir / filename, dpi=300, bbox_inches='tight') + plt.close() + |