aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFuwn <[email protected]>2025-12-09 23:16:23 -0800
committerFuwn <[email protected]>2025-12-09 23:16:23 -0800
commit3ffcdb247df3f56c4c21c6fed83ee1af5fb94224 (patch)
tree409fe42bb385ca73bd1b152623465ee098434179
downloadmathematicalstatisticsproject-3ffcdb247df3f56c4c21c6fed83ee1af5fb94224.tar.xz
mathematicalstatisticsproject-3ffcdb247df3f56c4c21c6fed83ee1af5fb94224.zip
feat: Initial commitHEADmain
-rw-r--r--.gitignore33
-rw-r--r--README.md94
-rw-r--r--code_analyzer.py213
-rw-r--r--config.py64
-rw-r--r--data_collector.py251
-rw-r--r--github_client.py131
-rw-r--r--main.py438
-rw-r--r--requirements.txt13
-rw-r--r--statistical_analysis.py553
-rw-r--r--visualizer.py255
10 files changed, 2045 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..27755d4
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,33 @@
+# Python
+__pycache__/
+*.py[cod]
+*$py.class
+*.so
+.Python
+env/
+venv/
+ENV/
+.venv
+
+# Project specific
+results/
+figures/
+*.csv
+*.json
+.env
+
+# IDE
+.vscode/
+.idea/
+*.swp
+*.swo
+
+# OS
+.DS_Store
+Thumbs.db
+
+# Temporary
+tmp/
+temp/
+*.tmp
+
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..83b74d4
--- /dev/null
+++ b/README.md
@@ -0,0 +1,94 @@
+# Code Metrics Analysis Project
+
+This project analyzes code metrics from open-source Python projects on GitHub to investigate relationships between code complexity and issues/fixes.
+
+## Features
+
+- **Code Metrics Analysis**: Measures LOC, cyclomatic complexity, cognitive complexity, inheritance depth, and maintainability index
+- **Git Commit Analysis**: Analyzes git commit logs to find commits with "fix" in the message and tracks which files were changed
+- **Statistical Analysis**:
+ - Correlation analysis (Pearson and Spearman)
+ - Linear regression modeling
+ - Hypothesis testing (ANOVA, Kruskal-Wallis)
+ - Confidence intervals
+ - Variance-covariance analysis
+ - Pivot tables
+ - Discrete distribution analysis
+- **Visualizations**: Creates comprehensive plots and charts
+
+## Setup
+
+1. **Install dependencies**:
+
+```bash
+pip install -r requirements.txt
+```
+
+2. **Set up GitHub API token** (optional but recommended):
+ - Create a `.env` file in the project root
+ - Add your GitHub token: `GITHUB_TOKEN=your_token_here`
+ - Get a token from: https://github.com/settings/tokens
+
+## Usage
+
+Run the main analysis script:
+
+```bash
+python main.py
+```
+
+The script will:
+
+1. Use a curated list of popular Python projects that use semantic commits
+2. Clone repositories with full git history
+3. Analyze code metrics for all Python files
+4. Parse git commit logs to find "fix" commits (using semantic commit formats like "fix:", "fix(scope):", etc.) and track changed files
+5. Perform statistical analysis
+6. Generate visualizations
+7. Save results to `results/` and `figures/` directories
+
+## Configuration
+
+Edit `config.py` to customize:
+
+- Number of repositories to analyze (`MAX_REPOSITORIES`)
+- Minimum stars for repository selection (`MIN_STARS`)
+- Excluded directories (`EXCLUDE_DIRS`)
+- Statistical significance level (`SIGNIFICANCE_LEVEL`)
+- Confidence level (`CONFIDENCE_LEVEL`)
+
+## Output
+
+- `results/raw_metrics.csv`: All collected code metrics
+- `results/analysis_results.json`: Statistical analysis results
+- `figures/`: Various visualization plots
+
+## Project Structure
+
+- `main.py`: Main orchestration script
+- `github_client.py`: GitHub API client
+- `code_analyzer.py`: Code metrics analyzer
+- `data_collector.py`: Data collection pipeline
+- `statistical_analysis.py`: Statistical analysis functions
+- `visualizer.py`: Visualization functions
+- `config.py`: Configuration settings
+
+## Requirements
+
+- Python 3.8+
+- Git (for cloning repositories)
+- GitHub API token (optional, increases rate limits)
+
+## Notes
+
+- The analysis focuses on popular Python projects that use semantic commits
+- Fix detection recognizes semantic commit formats:
+ - `fix:` (conventional commits)
+ - `fix(scope):` (conventional commits with scope)
+ - `Fix:`, `FIX:` (case variations)
+ - `fixes #123`, `fix #123` (issue references)
+ - `fixed`, `fixing`, `bugfix`, `bug fix` (variations)
+- Only Python files (.py) are tracked for fix commits
+- Full git history is cloned (not shallow) to analyze all commits
+- Temporary cloned repositories are cleaned up after analysis
+- The curated repository list can be modified in `config.py`
diff --git a/code_analyzer.py b/code_analyzer.py
new file mode 100644
index 0000000..5b0b313
--- /dev/null
+++ b/code_analyzer.py
@@ -0,0 +1,213 @@
+"""
+Code metrics analyzer for Python files.
+Analyzes LOC, complexity, and other metrics.
+"""
+import os
+import ast
+from typing import Dict, List, Optional
+from pathlib import Path
+from concurrent.futures import ProcessPoolExecutor, as_completed
+from functools import partial
+import radon.complexity as radon_complexity
+from radon.metrics import mi_visit
+from radon.raw import analyze
+
+
+class CodeAnalyzer:
+ """Analyzer for Python code metrics."""
+
+ def __init__(self, exclude_dirs: Optional[List[str]] = None):
+ self.exclude_dirs = exclude_dirs or []
+
+ def should_analyze(self, file_path: str) -> bool:
+ """Check if a file should be analyzed."""
+ path = Path(file_path)
+
+ # Check if in excluded directory
+ for part in path.parts:
+ if part in self.exclude_dirs:
+ return False
+
+ # Check if Python file
+ return path.suffix == '.py'
+
+ def analyze_file(self, file_path: str) -> Optional[Dict]:
+ """Analyze a single Python file and return metrics."""
+ if not self.should_analyze(file_path):
+ return None
+
+ try:
+ with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
+ code = f.read()
+
+ if not code.strip():
+ return None
+
+ metrics = {}
+
+ # Lines of Code (LOC)
+ raw_metrics = analyze(code)
+ metrics['loc'] = raw_metrics.loc
+ metrics['lloc'] = raw_metrics.lloc # Logical lines of code
+ metrics['sloc'] = raw_metrics.sloc # Source lines of code
+ metrics['comments'] = raw_metrics.comments
+ metrics['blank_lines'] = raw_metrics.blank
+
+ # Cyclomatic Complexity using Radon
+ try:
+ complexity_results = radon_complexity.cc_visit(code)
+ total_complexity = sum(func.complexity for func in complexity_results)
+ avg_complexity = (total_complexity / len(complexity_results)
+ if complexity_results else 0)
+ max_complexity = max((func.complexity for func in complexity_results),
+ default=0)
+
+ metrics['cyclomatic_complexity'] = total_complexity
+ metrics['avg_complexity'] = avg_complexity
+ metrics['max_complexity'] = max_complexity
+ metrics['functions'] = len(complexity_results)
+ except:
+ metrics['cyclomatic_complexity'] = 0
+ metrics['avg_complexity'] = 0
+ metrics['max_complexity'] = 0
+ metrics['functions'] = 0
+
+ # Cognitive Complexity calculation
+ # Note: Lizard doesn't provide cognitive complexity directly
+ # We calculate it based on cyclomatic complexity + nesting depth penalty
+ # Cognitive complexity penalizes nesting more heavily than cyclomatic complexity
+ try:
+ import lizard
+ lizard_result = lizard.analyze_file(file_path)
+
+ if lizard_result and lizard_result.function_list:
+ total_cognitive = 0
+ for func in lizard_result.function_list:
+ # Get cyclomatic complexity (base)
+ base_cc = getattr(func, 'cyclomatic_complexity', 1)
+ # Get nesting depth (cognitive complexity penalizes nesting)
+ nesting_depth = getattr(func, 'max_nesting_depth', 0)
+ # Cognitive complexity formula: CC + (nesting_depth * 2)
+ # This approximates cognitive complexity by penalizing deep nesting
+ cognitive = base_cc + (nesting_depth * 2)
+ total_cognitive += max(0, cognitive) # Ensure non-negative
+
+ metrics['cognitive_complexity'] = total_cognitive
+ metrics['avg_cognitive_complexity'] = (
+ total_cognitive / len(lizard_result.function_list)
+ if lizard_result.function_list else 0
+ )
+ else:
+ # No functions found, set to 0
+ metrics['cognitive_complexity'] = 0
+ metrics['avg_cognitive_complexity'] = 0
+ except Exception as e:
+ # If lizard fails, use cyclomatic complexity as fallback
+ # This ensures we have some complexity metric even if lizard fails
+ metrics['cognitive_complexity'] = metrics.get('cyclomatic_complexity', 0)
+ metrics['avg_cognitive_complexity'] = metrics.get('avg_complexity', 0)
+
+ # Maintainability Index
+ try:
+ metrics['maintainability_index'] = mi_visit(code, multi=True)
+ except:
+ metrics['maintainability_index'] = 0
+
+ # Depth of Inheritance (for classes)
+ try:
+ tree = ast.parse(code)
+ max_depth = self._calculate_inheritance_depth(tree)
+ metrics['max_inheritance_depth'] = max_depth
+ metrics['classes'] = len([node for node in ast.walk(tree)
+ if isinstance(node, ast.ClassDef)])
+ except:
+ metrics['max_inheritance_depth'] = 0
+ metrics['classes'] = 0
+
+ # File path components for module analysis
+ path_parts = Path(file_path).parts
+ metrics['file_path'] = file_path
+ metrics['module'] = path_parts[-2] if len(path_parts) > 1 else 'root'
+ metrics['filename'] = path_parts[-1]
+
+ return metrics
+
+ except Exception as e:
+ print(f"Error analyzing {file_path}: {e}")
+ return None
+
+ def _calculate_inheritance_depth(self, tree: ast.AST) -> int:
+ """Calculate maximum inheritance depth in the AST."""
+ max_depth = 0
+
+ for node in ast.walk(tree):
+ if isinstance(node, ast.ClassDef):
+ depth = self._get_class_depth(node, tree)
+ max_depth = max(max_depth, depth)
+
+ return max_depth
+
+ def _get_class_depth(self, class_node: ast.ClassDef, tree: ast.AST) -> int:
+ """Get inheritance depth for a specific class."""
+ if not class_node.bases:
+ return 1
+
+ max_base_depth = 0
+ for base in class_node.bases:
+ if isinstance(base, ast.Name):
+ # Find the base class definition
+ for node in ast.walk(tree):
+ if isinstance(node, ast.ClassDef) and node.name == base.id:
+ base_depth = self._get_class_depth(node, tree)
+ max_base_depth = max(max_base_depth, base_depth)
+ break
+
+ return max_base_depth + 1
+
+ def analyze_directory(self, directory: str, parallel: bool = True, max_workers: Optional[int] = None) -> List[Dict]:
+ """Analyze all Python files in a directory recursively."""
+ directory_path = Path(directory)
+
+ # Collect all Python files to analyze
+ files_to_analyze = [
+ str(file_path) for file_path in directory_path.rglob('*.py')
+ if self.should_analyze(str(file_path))
+ ]
+
+ if not files_to_analyze:
+ return []
+
+ if parallel and len(files_to_analyze) > 1:
+ # Parallel analysis
+ metrics_list = []
+ with ProcessPoolExecutor(max_workers=max_workers) as executor:
+ # Create a partial function with the analyzer instance's method
+ analyze_func = partial(_analyze_file_wrapper, exclude_dirs=self.exclude_dirs)
+ futures = {executor.submit(analyze_func, file_path): file_path
+ for file_path in files_to_analyze}
+
+ for future in as_completed(futures):
+ try:
+ metrics = future.result()
+ if metrics:
+ metrics_list.append(metrics)
+ except Exception as e:
+ file_path = futures[future]
+ print(f" Warning: Error analyzing {file_path}: {e}")
+
+ return metrics_list
+ else:
+ # Sequential analysis
+ metrics_list = []
+ for file_path in files_to_analyze:
+ metrics = self.analyze_file(file_path)
+ if metrics:
+ metrics_list.append(metrics)
+ return metrics_list
+
+
+def _analyze_file_wrapper(file_path: str, exclude_dirs: List[str]) -> Optional[Dict]:
+ """Wrapper function for parallel file analysis."""
+ analyzer = CodeAnalyzer(exclude_dirs=exclude_dirs)
+ return analyzer.analyze_file(file_path)
+
diff --git a/config.py b/config.py
new file mode 100644
index 0000000..40fbedf
--- /dev/null
+++ b/config.py
@@ -0,0 +1,64 @@
+"""
+Configuration file for the code metrics analysis project.
+"""
+import os
+from dotenv import load_dotenv
+
+load_dotenv()
+
+# GitHub API Configuration
+GITHUB_TOKEN = os.getenv('GITHUB_TOKEN', '') # Set your token in .env file
+GITHUB_API_BASE = 'https://api.github.com'
+
+# Analysis Configuration
+MAX_REPOSITORIES = 10 # Limit number of repos to analyze
+MIN_STARS = 100 # Minimum stars for repository selection
+PYTHON_FILE_EXTENSIONS = ['.py']
+EXCLUDE_DIRS = ['__pycache__', '.git', 'venv', 'env', '.venv', 'node_modules', 'tests', 'test']
+
+# Curated list of popular Python projects that use semantic commits
+# Format: (owner, repo_name)
+CURATED_REPOSITORIES = [
+ # FastAPI - modern web framework
+ ('tiangolo', 'fastapi'),
+ # Requests - HTTP library
+ ('psf', 'requests'),
+ # Django REST Framework - API framework
+ ('encode', 'djangorestframework'),
+ # Flask - web framework
+ ('pallets', 'flask'),
+ # Celery - distributed task queue
+ ('celery', 'celery'),
+ # Pydantic - data validation
+ ('pydantic', 'pydantic'),
+ # SQLAlchemy - SQL toolkit
+ ('sqlalchemy', 'sqlalchemy'),
+ # Pandas - data analysis
+ ('pandas-dev', 'pandas'),
+ # NumPy - numerical computing
+ ('numpy', 'numpy'),
+ # Scikit-learn - machine learning
+ ('scikit-learn', 'scikit-learn'),
+]
+
+# Statistical Analysis Configuration
+SIGNIFICANCE_LEVEL = 0.05
+CONFIDENCE_LEVEL = 0.95
+
+# Output Configuration
+OUTPUT_DIR = 'results'
+FIGURES_DIR = 'figures'
+
+# Data Loading Configuration
+USE_EXISTING_METRICS = False # If True, load from existing raw_metrics.csv instead of collecting new data
+# Set to False to recollect data with fixed cognitive_complexity calculation
+RAW_METRICS_FILE = 'results/raw_metrics.csv' # Path to existing raw metrics CSV file
+
+# Analysis Mode Configuration
+FOCUSED_MODE = True # If True, only perform regression analysis and hypothesis testing (t-tests, z-tests)
+
+# Parallelization Configuration
+MAX_WORKERS = None # None = use CPU count, or set to specific number
+PARALLEL_REPOS = True # Process repositories in parallel
+PARALLEL_FILES = True # Analyze files in parallel
+
diff --git a/data_collector.py b/data_collector.py
new file mode 100644
index 0000000..90b9416
--- /dev/null
+++ b/data_collector.py
@@ -0,0 +1,251 @@
+"""
+Data collection pipeline that combines code metrics with git commit log data.
+"""
+import os
+import subprocess
+import shutil
+import tempfile
+import re
+from typing import List, Dict, Optional, Set
+from pathlib import Path
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from tqdm import tqdm
+from code_analyzer import CodeAnalyzer
+from config import EXCLUDE_DIRS
+
+
+class DataCollector:
+ """Collects and combines code metrics with git commit data."""
+
+ def __init__(self, code_analyzer: CodeAnalyzer):
+ self.code_analyzer = code_analyzer
+ self.temp_dir = None
+
+ def clone_repository(self, owner: str, repo: str) -> Optional[str]:
+ """Clone a repository to a temporary directory with full history."""
+ if not self.temp_dir:
+ self.temp_dir = tempfile.mkdtemp()
+
+ repo_url = f"https://github.com/{owner}/{repo}.git"
+ clone_path = os.path.join(self.temp_dir, repo)
+
+ try:
+ if os.path.exists(clone_path):
+ shutil.rmtree(clone_path)
+
+ # Clone with full history (needed for commit analysis)
+ subprocess.run(
+ ['git', 'clone', repo_url, clone_path],
+ check=True,
+ capture_output=True,
+ timeout=600
+ )
+ return clone_path
+ except Exception as e:
+ print(f"Error cloning {owner}/{repo}: {e}")
+ return None
+
+ def get_fix_commits(self, repo_path: str) -> Dict[str, Set[str]]:
+ """
+ Analyze git commit logs to find commits with 'fix' in the message
+ using semantic commit formats (fix:, fix(scope):, Fix:, etc.)
+ and track which files were changed in those commits.
+
+ Returns a dictionary mapping file paths to sets of commit hashes.
+ """
+ file_fix_commits: Dict[str, Set[str]] = {}
+
+ try:
+ # Get all commits - we'll filter for semantic commit formats
+ # Look for patterns like: fix:, fix(scope):, Fix:, FIX:, fixes, fixed, etc.
+ result = subprocess.run(
+ ['git', 'log', '--all', '--pretty=format:%H|%s'],
+ cwd=repo_path,
+ capture_output=True,
+ text=True,
+ timeout=300
+ )
+
+ if result.returncode != 0:
+ print(f" Warning: git log failed: {result.stderr}")
+ return file_fix_commits
+
+ # Parse commit hashes and check for semantic commit formats
+ fix_commit_hashes = []
+ for line in result.stdout.strip().split('\n'):
+ if '|' in line:
+ parts = line.split('|', 1)
+ commit_hash = parts[0].strip()
+ commit_msg = parts[1] if len(parts) > 1 else ''
+ commit_msg_lower = commit_msg.lower()
+
+ # Check for semantic commit formats:
+ # - fix: (conventional commits)
+ # - fix(scope): (conventional commits with scope)
+ # - Fix:, FIX: (case variations)
+ # - fixes #123, fix #123 (issue references)
+ # - fixed, fixing (verb forms)
+ # - bugfix, bug fix (variations)
+ is_fix_commit = (
+ commit_msg_lower.startswith('fix:') or
+ commit_msg_lower.startswith('fix(') or
+ commit_msg_lower.startswith('fixes') or
+ commit_msg_lower.startswith('fix ') or
+ commit_msg_lower.startswith('fixed') or
+ commit_msg_lower.startswith('fixing') or
+ 'bugfix' in commit_msg_lower or
+ 'bug fix' in commit_msg_lower or
+ (commit_msg.startswith('Fix:') and not commit_msg_lower.startswith('feature')) or
+ (commit_msg.startswith('FIX:') and not commit_msg_lower.startswith('feature'))
+ )
+
+ # Exclude false positives (like "prefix", "suffix", "affix", etc.)
+ if is_fix_commit and not any(word in commit_msg_lower for word in
+ ['prefix', 'suffix', 'affix', 'transfix', 'crucifix']):
+ if len(commit_hash) == 40:
+ fix_commit_hashes.append(commit_hash)
+
+ print(f" Found {len(fix_commit_hashes)} fix commits")
+
+ # For each fix commit, get the files that were changed (parallelized)
+ def get_commit_files(commit_hash: str) -> List[str]:
+ """Get Python files changed in a commit."""
+ try:
+ file_result = subprocess.run(
+ ['git', 'show', '--name-only', '--pretty=format:', commit_hash],
+ cwd=repo_path,
+ capture_output=True,
+ text=True,
+ timeout=60
+ )
+ if file_result.returncode == 0:
+ return [f.strip() for f in file_result.stdout.strip().split('\n')
+ if f.strip() and f.strip().endswith('.py')]
+ except:
+ pass
+ return []
+
+ # Process commits in parallel
+ if fix_commit_hashes:
+ with ThreadPoolExecutor(max_workers=10) as executor:
+ futures = {executor.submit(get_commit_files, commit_hash): commit_hash
+ for commit_hash in fix_commit_hashes}
+
+ for future in as_completed(futures):
+ commit_hash = futures[future]
+ try:
+ changed_files = future.result()
+ for file_path in changed_files:
+ if file_path not in file_fix_commits:
+ file_fix_commits[file_path] = set()
+ file_fix_commits[file_path].add(commit_hash)
+ except Exception as e:
+ print(f" Warning: Error processing commit {commit_hash[:8]}: {e}")
+
+ except Exception as e:
+ print(f" Warning: Error analyzing commits: {e}")
+
+ return file_fix_commits
+
+ def count_fixes_per_file(self, repo_path: str, code_metrics: List[Dict]) -> None:
+ """
+ Count fix commits for each file in code_metrics.
+ Updates the metrics dictionaries in place.
+ """
+ print(" Analyzing git commit logs for semantic fix commits...")
+ file_fix_commits = self.get_fix_commits(repo_path)
+
+ if not file_fix_commits:
+ print(" No fix commits found matching semantic commit formats")
+ # Set fix_count to 0 for all files
+ for metric in code_metrics:
+ metric['fix_count'] = 0
+ metric['total_fixes'] = 0
+ return
+
+ repo_base = Path(repo_path)
+
+ # Create a mapping from relative paths to fix counts
+ fix_counts: Dict[str, int] = {}
+ for file_path, commits in file_fix_commits.items():
+ # Try to normalize the path
+ try:
+ abs_path = Path(repo_path) / file_path
+ if abs_path.exists():
+ rel_path = abs_path.relative_to(repo_base)
+ fix_counts[str(rel_path)] = len(commits)
+ # Also store with forward slashes for matching
+ fix_counts[file_path] = len(commits)
+ except:
+ fix_counts[file_path] = len(commits)
+
+ print(f" Found {len(fix_counts)} files with fix commits")
+
+ # Match files to fix counts
+ for metric in tqdm(code_metrics, desc=" Matching files with fixes"):
+ relative_path = metric.get('relative_path', '')
+ filename = metric.get('filename', '')
+ file_path = metric.get('file_path', '')
+
+ fix_count = 0
+
+ # Try multiple matching strategies
+ if relative_path in fix_counts:
+ fix_count = fix_counts[relative_path]
+ elif filename in fix_counts:
+ fix_count = fix_counts[filename]
+ else:
+ # Try matching by filename in the fix_counts keys
+ for fix_file, count in fix_counts.items():
+ if filename in fix_file or relative_path in fix_file:
+ fix_count = max(fix_count, count)
+
+ metric['fix_count'] = fix_count
+ metric['total_fixes'] = fix_count # Alias for consistency
+
+ def collect_repository_data(self, owner: str, repo: str, parallel_files: bool = True, max_workers: Optional[int] = None) -> Optional[Dict]:
+ """Collect all data for a repository."""
+ print(f"\nCollecting data for {owner}/{repo}...")
+
+ # Clone repository
+ repo_path = self.clone_repository(owner, repo)
+ if not repo_path:
+ return None
+
+ # Analyze code metrics (parallelized)
+ print(" Analyzing code metrics...")
+ code_metrics = self.code_analyzer.analyze_directory(repo_path, parallel=parallel_files, max_workers=max_workers)
+
+ if not code_metrics:
+ print(f" No Python files found in {owner}/{repo}")
+ return None
+
+ # Map file paths to relative paths for matching
+ repo_base = Path(repo_path)
+ for metric in code_metrics:
+ file_path = Path(metric['file_path'])
+ try:
+ relative_path = file_path.relative_to(repo_base)
+ metric['relative_path'] = str(relative_path).replace('\\', '/')
+ except:
+ metric['relative_path'] = metric['file_path']
+
+ # Count fix commits per file
+ self.count_fixes_per_file(repo_path, code_metrics)
+
+ # Get total fix commits count
+ total_fixes = sum(metric.get('fix_count', 0) for metric in code_metrics)
+
+ return {
+ 'owner': owner,
+ 'repo': repo,
+ 'code_metrics': code_metrics,
+ 'total_fixes': total_fixes
+ }
+
+ def cleanup(self):
+ """Clean up temporary directories."""
+ if self.temp_dir and os.path.exists(self.temp_dir):
+ shutil.rmtree(self.temp_dir)
+ self.temp_dir = None
+
diff --git a/github_client.py b/github_client.py
new file mode 100644
index 0000000..153ca39
--- /dev/null
+++ b/github_client.py
@@ -0,0 +1,131 @@
+"""
+GitHub API client for fetching repository data, issues, and pull requests.
+"""
+import requests
+import time
+from typing import List, Dict, Optional
+from config import GITHUB_TOKEN, GITHUB_API_BASE
+
+
+class GitHubClient:
+ """Client for interacting with GitHub API."""
+
+ def __init__(self, token: Optional[str] = None):
+ self.token = token or GITHUB_TOKEN
+ self.headers = {
+ 'Accept': 'application/vnd.github.v3+json',
+ }
+ if self.token:
+ self.headers['Authorization'] = f'token {self.token}'
+ self.session = requests.Session()
+ self.session.headers.update(self.headers)
+
+ def _make_request(self, url: str, params: Optional[Dict] = None) -> Dict:
+ """Make a request to GitHub API with rate limiting."""
+ response = self.session.get(url, params=params)
+
+ # Handle rate limiting
+ if response.status_code == 403 and 'rate limit' in response.text.lower():
+ reset_time = int(response.headers.get('X-RateLimit-Reset', time.time() + 60))
+ wait_time = max(0, reset_time - int(time.time()))
+ print(f"Rate limited. Waiting {wait_time} seconds...")
+ time.sleep(wait_time)
+ response = self.session.get(url, params=params)
+
+ response.raise_for_status()
+ return response.json()
+
+ def search_repositories(self, query: str, sort: str = 'stars', order: str = 'desc',
+ per_page: int = 10) -> List[Dict]:
+ """Search for repositories matching the query."""
+ url = f"{GITHUB_API_BASE}/search/repositories"
+ params = {
+ 'q': query,
+ 'sort': sort,
+ 'order': order,
+ 'per_page': per_page
+ }
+ results = self._make_request(url, params=params)
+ return results.get('items', [])
+
+ def get_repository_info(self, owner: str, repo: str) -> Dict:
+ """Get detailed information about a repository."""
+ url = f"{GITHUB_API_BASE}/repos/{owner}/{repo}"
+ return self._make_request(url)
+
+ def get_repository_issues(self, owner: str, repo: str, state: str = 'all',
+ per_page: int = 100) -> List[Dict]:
+ """Get all issues for a repository."""
+ url = f"{GITHUB_API_BASE}/repos/{owner}/{repo}/issues"
+ all_issues = []
+ page = 1
+
+ while True:
+ params = {
+ 'state': state,
+ 'per_page': per_page,
+ 'page': page
+ }
+ issues = self._make_request(url, params=params)
+
+ if not issues:
+ break
+
+ # Filter out pull requests (they appear in issues endpoint)
+ issues = [issue for issue in issues if 'pull_request' not in issue]
+ all_issues.extend(issues)
+
+ if len(issues) < per_page:
+ break
+
+ page += 1
+ time.sleep(0.5) # Be nice to the API
+
+ return all_issues
+
+ def get_repository_pulls(self, owner: str, repo: str, state: str = 'all',
+ per_page: int = 100) -> List[Dict]:
+ """Get all pull requests for a repository."""
+ url = f"{GITHUB_API_BASE}/repos/{owner}/{repo}/pulls"
+ all_pulls = []
+ page = 1
+
+ while True:
+ params = {
+ 'state': state,
+ 'per_page': per_page,
+ 'page': page
+ }
+ pulls = self._make_request(url, params=params)
+
+ if not pulls:
+ break
+
+ all_pulls.extend(pulls)
+
+ if len(pulls) < per_page:
+ break
+
+ page += 1
+ time.sleep(0.5)
+
+ return all_pulls
+
+ def get_file_issues(self, owner: str, repo: str, file_path: str) -> Dict:
+ """Get issues and PRs related to a specific file."""
+ # Search issues mentioning the file
+ query = f'repo:{owner}/{repo} {file_path}'
+ url = f"{GITHUB_API_BASE}/search/issues"
+ params = {'q': query, 'per_page': 100}
+
+ try:
+ results = self._make_request(url, params=params)
+ return {
+ 'issues': len([item for item in results.get('items', [])
+ if 'pull_request' not in item]),
+ 'pulls': len([item for item in results.get('items', [])
+ if 'pull_request' in item])
+ }
+ except:
+ return {'issues': 0, 'pulls': 0}
+
diff --git a/main.py b/main.py
new file mode 100644
index 0000000..0440137
--- /dev/null
+++ b/main.py
@@ -0,0 +1,438 @@
+"""
+Main analysis script for code metrics analysis project.
+Orchestrates data collection, analysis, and visualization.
+"""
+import pandas as pd
+import numpy as np
+import json
+from pathlib import Path
+from typing import Optional, Dict
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from github_client import GitHubClient
+from code_analyzer import CodeAnalyzer
+from data_collector import DataCollector
+from statistical_analysis import StatisticalAnalyzer
+from visualizer import Visualizer
+from config import (
+ MAX_REPOSITORIES, MIN_STARS, EXCLUDE_DIRS,
+ OUTPUT_DIR, FIGURES_DIR, SIGNIFICANCE_LEVEL, CONFIDENCE_LEVEL,
+ CURATED_REPOSITORIES, MAX_WORKERS, PARALLEL_REPOS, PARALLEL_FILES,
+ USE_EXISTING_METRICS, RAW_METRICS_FILE, FOCUSED_MODE
+)
+
+
+def main():
+ """Main analysis pipeline."""
+ print("=" * 80)
+ print("Code Metrics Analysis Pipeline")
+ print("=" * 80)
+
+ # Create output directories
+ Path(OUTPUT_DIR).mkdir(exist_ok=True)
+ Path(FIGURES_DIR).mkdir(exist_ok=True)
+
+ # Initialize components
+ print("\n1. Initializing components...")
+ stat_analyzer = StatisticalAnalyzer(
+ significance_level=SIGNIFICANCE_LEVEL,
+ confidence_level=CONFIDENCE_LEVEL
+ )
+ visualizer = Visualizer(output_dir=FIGURES_DIR)
+
+ # Check if we should use existing metrics
+ raw_metrics_path = Path(RAW_METRICS_FILE)
+ use_existing = USE_EXISTING_METRICS and raw_metrics_path.exists()
+
+ if use_existing:
+ print(f"\n2. Loading existing raw metrics from {RAW_METRICS_FILE}...")
+ try:
+ df = pd.read_csv(raw_metrics_path)
+ print(f" ✓ Loaded {len(df)} file metrics from existing data")
+ all_metrics = df.to_dict('records')
+ repo_summaries = [] # We don't have repo summaries from CSV
+ except Exception as e:
+ print(f" ✗ Error loading existing metrics: {e}")
+ print(" Falling back to data collection...")
+ use_existing = False
+
+ if not use_existing:
+ # Initialize data collection components
+ github_client = GitHubClient()
+ code_analyzer = CodeAnalyzer(exclude_dirs=EXCLUDE_DIRS)
+ data_collector = DataCollector(code_analyzer)
+
+ # Use curated list of popular Python repositories
+ print("\n2. Using curated list of popular Python repositories...")
+ repositories_to_analyze = CURATED_REPOSITORIES[:MAX_REPOSITORIES]
+ print(f"Selected {len(repositories_to_analyze)} repositories:")
+ for owner, repo_name in repositories_to_analyze:
+ print(f" - {owner}/{repo_name}")
+
+ # Collect data from repositories (parallelized)
+ print("\n3. Collecting data from repositories...")
+ all_metrics = []
+ repo_summaries = []
+
+ def process_repository(owner: str, repo_name: str) -> Optional[Dict]:
+ """Process a single repository and return results."""
+ try:
+ print(f" Processing {owner}/{repo_name}...")
+
+ # Get repository info for stars count
+ try:
+ repo_info = github_client.get_repository_info(owner, repo_name)
+ stars = repo_info.get('stargazers_count', 0)
+ except:
+ stars = 0
+
+ repo_data = data_collector.collect_repository_data(
+ owner, repo_name,
+ parallel_files=PARALLEL_FILES,
+ max_workers=MAX_WORKERS
+ )
+
+ if repo_data and repo_data['code_metrics']:
+ print(f" ✓ {owner}/{repo_name}: {len(repo_data['code_metrics'])} files, {repo_data['total_fixes']} fixes")
+ return {
+ 'metrics': repo_data['code_metrics'],
+ 'summary': {
+ 'owner': owner,
+ 'repo': repo_name,
+ 'files_analyzed': len(repo_data['code_metrics']),
+ 'total_fixes': repo_data['total_fixes'],
+ 'stars': stars
+ }
+ }
+ else:
+ print(f" ✗ {owner}/{repo_name}: No data collected")
+ return None
+ except Exception as e:
+ print(f" ✗ {owner}/{repo_name}: Error - {e}")
+ import traceback
+ traceback.print_exc()
+ return None
+
+ if PARALLEL_REPOS and len(repositories_to_analyze) > 1:
+ # Process repositories in parallel
+ print(f" Processing {len(repositories_to_analyze)} repositories in parallel...")
+ with ThreadPoolExecutor(max_workers=min(len(repositories_to_analyze), MAX_WORKERS or 4)) as executor:
+ futures = {executor.submit(process_repository, owner, repo_name): (owner, repo_name)
+ for owner, repo_name in repositories_to_analyze}
+
+ for future in as_completed(futures):
+ owner, repo_name = futures[future]
+ try:
+ result = future.result()
+ if result:
+ all_metrics.extend(result['metrics'])
+ repo_summaries.append(result['summary'])
+ except Exception as e:
+ print(f" ✗ {owner}/{repo_name}: Failed - {e}")
+ else:
+ # Sequential processing
+ for i, (owner, repo_name) in enumerate(repositories_to_analyze, 1):
+ print(f"\n[{i}/{len(repositories_to_analyze)}] Processing {owner}/{repo_name}...")
+ result = process_repository(owner, repo_name)
+ if result:
+ all_metrics.extend(result['metrics'])
+ repo_summaries.append(result['summary'])
+
+ if not all_metrics:
+ print("\nNo metrics collected. Exiting.")
+ return
+
+ # Convert to DataFrame
+ print("\n4. Preparing data for analysis...")
+ df = stat_analyzer.prepare_dataframe(all_metrics)
+ print(f"Total files analyzed: {len(df)}")
+ print(f"Columns: {list(df.columns)}")
+
+ # Save raw data
+ df.to_csv(Path(OUTPUT_DIR) / 'raw_metrics.csv', index=False)
+ print(f" ✓ Saved raw metrics to {OUTPUT_DIR}/raw_metrics.csv")
+ else:
+ # Already have DataFrame from CSV
+ print("\n3. Preparing data for analysis...")
+ df = stat_analyzer.prepare_dataframe(all_metrics)
+ print(f"Total files analyzed: {len(df)}")
+ print(f"Columns: {list(df.columns)}")
+
+ if len(df) == 0:
+ print("\nNo metrics available for analysis. Exiting.")
+ return
+
+ # Statistical Analysis
+ step_num = "4" if use_existing else "5"
+ print(f"\n{step_num}. Performing statistical analysis...")
+
+ if FOCUSED_MODE:
+ print(" [FOCUSED MODE: Regression and Hypothesis Testing Only]")
+
+ # Regression analysis
+ print(" - Regression analysis...")
+ complexity_features = [
+ 'loc', 'cyclomatic_complexity', 'cognitive_complexity',
+ 'max_complexity', 'avg_complexity', 'max_inheritance_depth'
+ ]
+ regression_results = stat_analyzer.regression_analysis(
+ df, complexity_features, target='fix_count'
+ )
+
+ # Hypothesis testing - t-tests
+ print(" - T-test analysis...")
+ t_test_results = stat_analyzer.t_test_analysis(df)
+
+ # Hypothesis testing - z-tests
+ print(" - Z-test analysis...")
+ z_test_results = stat_analyzer.z_test_analysis(df)
+
+ # Hypothesis testing - module comparisons (ANOVA)
+ print(" - Hypothesis testing (ANOVA)...")
+ hypothesis_results = stat_analyzer.hypothesis_testing(df)
+
+ # Confidence intervals for key metrics
+ print(" - Confidence intervals...")
+ ci_results = stat_analyzer.confidence_intervals(
+ df, ['cyclomatic_complexity', 'cognitive_complexity', 'loc', 'fix_count']
+ )
+
+ # Initialize empty results for skipped analyses
+ correlation_results = {}
+ var_cov_results = {}
+ pivot_results = {}
+ distribution_results = {}
+ else:
+ # Full analysis mode
+ # Correlation analysis
+ print(" - Correlation analysis...")
+ correlation_results = stat_analyzer.correlation_analysis(df)
+
+ # Regression analysis
+ print(" - Regression analysis...")
+ complexity_features = [
+ 'loc', 'cyclomatic_complexity', 'cognitive_complexity',
+ 'max_complexity', 'avg_complexity', 'max_inheritance_depth'
+ ]
+ regression_results = stat_analyzer.regression_analysis(
+ df, complexity_features, target='fix_count'
+ )
+
+ # Hypothesis testing
+ print(" - Hypothesis testing...")
+ hypothesis_results = stat_analyzer.hypothesis_testing(df)
+
+ # T-test analysis
+ print(" - T-test analysis...")
+ t_test_results = stat_analyzer.t_test_analysis(df)
+
+ # Z-test analysis
+ print(" - Z-test analysis...")
+ z_test_results = stat_analyzer.z_test_analysis(df)
+
+ # Confidence intervals
+ print(" - Confidence intervals...")
+ ci_results = stat_analyzer.confidence_intervals(
+ df, ['cyclomatic_complexity', 'cognitive_complexity', 'loc', 'fix_count']
+ )
+
+ # Variance-covariance analysis
+ print(" - Variance-covariance analysis...")
+ var_cov_results = stat_analyzer.variance_covariance_analysis(df)
+
+ # Pivot table analysis
+ print(" - Pivot table analysis...")
+ pivot_results = stat_analyzer.pivot_table_analysis(df)
+
+ # Discrete distribution analysis
+ print(" - Discrete distribution analysis...")
+ distribution_results = stat_analyzer.discrete_distribution_analysis(df)
+
+ # Save analysis results
+ step_num = "5" if use_existing else "6"
+ print(f"\n{step_num}. Saving analysis results...")
+ results = {
+ 'correlation_analysis': correlation_results,
+ 'regression_analysis': regression_results,
+ 'hypothesis_testing': hypothesis_results,
+ 't_test_analysis': t_test_results if 't_test_results' in locals() else {},
+ 'z_test_analysis': z_test_results if 'z_test_results' in locals() else {},
+ 'confidence_intervals': ci_results,
+ 'variance_covariance': var_cov_results,
+ 'pivot_tables': {k: v.to_dict() if isinstance(v, pd.DataFrame) else v
+ for k, v in pivot_results.items()},
+ 'distribution_analysis': distribution_results,
+ 'repository_summaries': repo_summaries,
+ 'analysis_mode': 'focused' if FOCUSED_MODE else 'full'
+ }
+
+ # Convert numpy types to native Python types for JSON serialization
+ def convert_to_serializable(obj):
+ if isinstance(obj, (np.integer, np.floating)):
+ return float(obj)
+ elif isinstance(obj, np.ndarray):
+ return obj.tolist()
+ elif isinstance(obj, dict):
+ return {k: convert_to_serializable(v) for k, v in obj.items()}
+ elif isinstance(obj, list):
+ return [convert_to_serializable(item) for item in obj]
+ elif isinstance(obj, pd.DataFrame):
+ return obj.to_dict()
+ return obj
+
+ results_serializable = convert_to_serializable(results)
+
+ with open(Path(OUTPUT_DIR) / 'analysis_results.json', 'w') as f:
+ json.dump(results_serializable, f, indent=2, default=str)
+ print(f" ✓ Saved analysis results to {OUTPUT_DIR}/analysis_results.json")
+
+ # Create visualizations
+ print("\n" + ("6" if use_existing else "7") + ". Creating visualizations...")
+
+ if FOCUSED_MODE:
+ print(" [FOCUSED MODE: Regression visualizations only]")
+
+ # Scatter plots for key relationships
+ print(" - Scatter plots...")
+ visualizer.plot_complexity_vs_issues_scatter(
+ df, 'loc', 'fix_count',
+ 'loc_vs_fixes.png'
+ )
+ visualizer.plot_complexity_vs_issues_scatter(
+ df, 'cognitive_complexity', 'fix_count',
+ 'cognitive_complexity_vs_fixes.png'
+ )
+
+ # Regression results
+ if regression_results:
+ print(" - Regression results...")
+ visualizer.plot_regression_results(regression_results)
+ else:
+ # Full visualization mode
+ # Correlation heatmap
+ print(" - Correlation heatmap...")
+ visualizer.plot_correlation_heatmap(
+ df,
+ complexity_cols=['loc', 'cyclomatic_complexity', 'cognitive_complexity',
+ 'max_complexity', 'avg_complexity'],
+ issue_cols=['fix_count', 'total_fixes']
+ )
+
+ # Scatter plots
+ print(" - Scatter plots...")
+ visualizer.plot_complexity_vs_issues_scatter(
+ df, 'cyclomatic_complexity', 'fix_count',
+ 'cyclomatic_complexity_vs_fixes.png'
+ )
+ visualizer.plot_complexity_vs_issues_scatter(
+ df, 'cognitive_complexity', 'fix_count',
+ 'cognitive_complexity_vs_fixes.png'
+ )
+ visualizer.plot_complexity_vs_issues_scatter(
+ df, 'loc', 'fix_count',
+ 'loc_vs_fixes.png'
+ )
+
+ # Module comparison
+ print(" - Module comparison...")
+ visualizer.plot_module_complexity_comparison(
+ df, 'cyclomatic_complexity'
+ )
+
+ # Distribution analysis
+ print(" - Distribution analysis...")
+ visualizer.plot_distribution_analysis(df, 'cyclomatic_complexity')
+ visualizer.plot_distribution_analysis(df, 'fix_count',
+ 'fix_distribution.png')
+
+ # Regression results
+ if regression_results:
+ print(" - Regression results...")
+ visualizer.plot_regression_results(regression_results)
+
+ print(f" ✓ Saved visualizations to {FIGURES_DIR}/")
+
+ # Print summary statistics
+ step_num = "7" if use_existing else "8"
+ print(f"\n{step_num}. Summary Statistics")
+ print("=" * 80)
+ print(f"\nTotal files analyzed: {len(df)}")
+ if repo_summaries:
+ print(f"Total repositories: {len(repo_summaries)}")
+
+ print("\nComplexity Metrics (Mean ± Std):")
+ for metric in ['loc', 'cyclomatic_complexity', 'cognitive_complexity',
+ 'max_complexity', 'avg_complexity']:
+ if metric in df.columns:
+ mean_val = df[metric].mean()
+ std_val = df[metric].std()
+ print(f" {metric:30s}: {mean_val:8.2f} ± {std_val:8.2f}")
+
+ print("\nFix Metrics (Mean ± Std):")
+ for metric in ['fix_count', 'total_fixes']:
+ if metric in df.columns:
+ mean_val = df[metric].mean()
+ std_val = df[metric].std()
+ print(f" {metric:30s}: {mean_val:8.2f} ± {std_val:8.2f}")
+
+ if not FOCUSED_MODE:
+ print("\nSignificant Correlations:")
+ sig_corr = correlation_results.get('significant_correlations', {})
+ if sig_corr:
+ for metric, corr_value in list(sig_corr.items())[:10]:
+ print(f" {metric}: {corr_value:.3f}")
+ else:
+ print(" None found")
+
+ if regression_results:
+ print(f"\nRegression Analysis:")
+ print(f" R²: {regression_results.get('r_squared', 0):.3f}")
+ print(f" RMSE: {regression_results.get('rmse', 0):.3f}")
+ if regression_results.get('significant_features'):
+ print(f" Significant features: {regression_results['significant_features']}")
+
+ # T-test results
+ if 't_test_results' in locals() and t_test_results:
+ print(f"\nT-Test Results (High-fix vs Low-fix files):")
+ for metric, result in t_test_results.items():
+ if 'error' not in result:
+ sig_marker = "***" if result.get('significant') else ""
+ print(f" {metric}:")
+ print(f" t-statistic: {result.get('t_statistic', 0):.3f}")
+ print(f" p-value: {result.get('p_value', 1):.4f} {sig_marker}")
+ print(f" High-fix mean: {result.get('high_fix_mean', 0):.2f}")
+ print(f" Low-fix mean: {result.get('low_fix_mean', 0):.2f}")
+
+ # Z-test results
+ if 'z_test_results' in locals() and z_test_results:
+ print(f"\nZ-Test Results (High-fix vs Low-fix files):")
+ for metric, result in z_test_results.items():
+ if 'error' not in result:
+ sig_marker = "***" if result.get('significant') else ""
+ print(f" {metric}:")
+ print(f" z-statistic: {result.get('z_statistic', 0):.3f}")
+ print(f" p-value: {result.get('p_value', 1):.4f} {sig_marker}")
+ print(f" Mean difference: {result.get('mean_difference', 0):.2f}")
+ print(f" 95% CI: [{result.get('ci_lower', 0):.2f}, {result.get('ci_upper', 0):.2f}]")
+
+ # Hypothesis testing results
+ if hypothesis_results:
+ print(f"\nHypothesis Testing (ANOVA/Kruskal-Wallis):")
+ for test_name, result in list(hypothesis_results.items())[:5]:
+ if isinstance(result, dict) and 'p_value' in result:
+ sig_marker = "***" if result.get('significant') else ""
+ print(f" {test_name}: p={result.get('p_value', 1):.4f} {sig_marker}")
+
+ print("\n" + "=" * 80)
+ print("Analysis complete!")
+ print(f"Results saved to: {OUTPUT_DIR}/")
+ print(f"Figures saved to: {FIGURES_DIR}/")
+ print("=" * 80)
+
+ # Cleanup (only if we collected data)
+ if not use_existing:
+ data_collector.cleanup()
+
+
+if __name__ == '__main__':
+ main()
+
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..e5b3df3
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,13 @@
+requests>=2.31.0
+radon>=6.0.1
+lizard>=1.17.10
+pandas>=2.0.0
+numpy>=1.24.0
+scipy>=1.11.0
+matplotlib>=3.7.0
+seaborn>=0.12.0
+scikit-learn>=1.3.0
+PyGithub>=1.59.0
+tqdm>=4.65.0
+python-dotenv>=1.0.0
+
diff --git a/statistical_analysis.py b/statistical_analysis.py
new file mode 100644
index 0000000..2a50f4e
--- /dev/null
+++ b/statistical_analysis.py
@@ -0,0 +1,553 @@
+"""
+Statistical analysis module for code metrics and issue data.
+"""
+import pandas as pd
+import numpy as np
+from scipy import stats
+from scipy.stats import pearsonr, spearmanr, chi2_contingency
+from sklearn.linear_model import LinearRegression
+from sklearn.preprocessing import StandardScaler
+from sklearn.feature_selection import VarianceThreshold
+from typing import Dict, List, Tuple
+import warnings
+warnings.filterwarnings('ignore')
+
+
+class StatisticalAnalyzer:
+ """Performs statistical analysis on code metrics data."""
+
+ def __init__(self, significance_level: float = 0.05, confidence_level: float = 0.95):
+ self.significance_level = significance_level
+ self.confidence_level = confidence_level
+
+ def prepare_dataframe(self, data: List[Dict]) -> pd.DataFrame:
+ """Convert list of metrics dictionaries to DataFrame."""
+ df = pd.DataFrame(data)
+ return df
+
+ def correlation_analysis(self, df: pd.DataFrame) -> Dict:
+ """Perform correlation analysis between complexity metrics and issues."""
+ results = {}
+
+ # Select numeric columns for correlation
+ complexity_metrics = [
+ 'loc', 'lloc', 'sloc', 'cyclomatic_complexity',
+ 'cognitive_complexity', 'max_complexity', 'avg_complexity',
+ 'max_inheritance_depth', 'maintainability_index'
+ ]
+
+ issue_metrics = ['fix_count', 'total_fixes']
+
+ # Filter to columns that exist
+ complexity_cols = [col for col in complexity_metrics if col in df.columns]
+ issue_cols = [col for col in issue_metrics if col in df.columns]
+
+ correlations = {}
+ p_values = {}
+
+ for comp_col in complexity_cols:
+ for issue_col in issue_cols:
+ # Remove NaN values
+ mask = df[[comp_col, issue_col]].notna().all(axis=1)
+ if mask.sum() < 3: # Need at least 3 data points
+ continue
+
+ x = df.loc[mask, comp_col]
+ y = df.loc[mask, issue_col]
+
+ # Pearson correlation
+ pearson_r, pearson_p = pearsonr(x, y)
+ correlations[f'{comp_col}_vs_{issue_col}_pearson'] = pearson_r
+ p_values[f'{comp_col}_vs_{issue_col}_pearson'] = pearson_p
+
+ # Spearman correlation (non-parametric)
+ spearman_r, spearman_p = spearmanr(x, y)
+ correlations[f'{comp_col}_vs_{issue_col}_spearman'] = spearman_r
+ p_values[f'{comp_col}_vs_{issue_col}_spearman'] = spearman_p
+
+ results['correlations'] = correlations
+ results['p_values'] = p_values
+ results['significant_correlations'] = {
+ k: v for k, v in correlations.items()
+ if p_values.get(k.replace('_pearson', '_pearson').replace('_spearman', '_spearman'), 1) < self.significance_level
+ }
+
+ return results
+
+ def regression_analysis(self, df: pd.DataFrame,
+ complexity_features: List[str],
+ target: str = 'fix_count') -> Dict:
+ """Perform regression analysis to predict fix count from complexity."""
+ results = {}
+
+ # Prepare features
+ feature_cols = [col for col in complexity_features if col in df.columns]
+ if not feature_cols:
+ return results
+
+ # Remove rows with missing values
+ mask = df[feature_cols + [target]].notna().all(axis=1)
+ if mask.sum() < len(feature_cols) + 1:
+ return results
+
+ X = df.loc[mask, feature_cols]
+ y = df.loc[mask, target]
+
+ # Check for multicollinearity - remove highly correlated features
+ if len(feature_cols) > 1:
+ corr_matrix = X.corr().abs()
+ upper_triangle = corr_matrix.where(
+ np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)
+ )
+ # Find features with correlation > 0.95
+ high_corr_features = [column for column in upper_triangle.columns
+ if any(upper_triangle[column] > 0.95)]
+ if high_corr_features:
+ # Keep the first feature, remove others
+ features_to_remove = high_corr_features
+ feature_cols = [f for f in feature_cols if f not in features_to_remove]
+ X = X[feature_cols]
+
+ if len(feature_cols) == 0:
+ return results
+
+ # Standardize features
+ scaler = StandardScaler()
+ X_scaled = scaler.fit_transform(X)
+
+ # Fit linear regression
+ model = LinearRegression()
+ model.fit(X_scaled, y)
+
+ # Predictions
+ y_pred = model.predict(X_scaled)
+
+ # Calculate metrics
+ r_squared = model.score(X_scaled, y)
+ mse = np.mean((y - y_pred) ** 2)
+ rmse = np.sqrt(mse)
+
+ # Coefficients
+ coefficients = dict(zip(feature_cols, model.coef_))
+ intercept = model.intercept_
+
+ # Confidence intervals for coefficients
+ n = len(y)
+ p = len(feature_cols)
+ residuals = y - y_pred
+ mse_residual = np.sum(residuals ** 2) / (n - p - 1)
+
+ # Standard errors - handle singular matrix
+ X_with_intercept = np.column_stack([np.ones(n), X_scaled])
+ XTX = X_with_intercept.T @ X_with_intercept
+
+ try:
+ # Check if matrix is singular or near-singular
+ if np.linalg.cond(XTX) > 1e12:
+ # Use pseudo-inverse for near-singular matrices
+ cov_matrix = mse_residual * np.linalg.pinv(XTX)
+ else:
+ cov_matrix = mse_residual * np.linalg.inv(XTX)
+ std_errors = np.sqrt(np.diag(cov_matrix))[1:] # Skip intercept
+
+ # t-statistics and p-values
+ t_stats = model.coef_ / std_errors
+ p_values = 2 * (1 - stats.t.cdf(np.abs(t_stats), n - p - 1))
+
+ # Confidence intervals
+ alpha = 1 - self.confidence_level
+ t_critical = stats.t.ppf(1 - alpha/2, n - p - 1)
+ ci_lower = model.coef_ - t_critical * std_errors
+ ci_upper = model.coef_ + t_critical * std_errors
+ except (np.linalg.LinAlgError, ValueError):
+ # If still singular, use pseudo-inverse
+ try:
+ cov_matrix = mse_residual * np.linalg.pinv(XTX)
+ std_errors = np.sqrt(np.diag(cov_matrix))[1:]
+ # Handle potential NaN values
+ std_errors = np.where(np.isnan(std_errors) | (std_errors == 0),
+ np.inf, std_errors)
+ t_stats = model.coef_ / std_errors
+ p_values = np.where(np.isfinite(t_stats),
+ 2 * (1 - stats.t.cdf(np.abs(t_stats), n - p - 1)),
+ np.nan)
+ alpha = 1 - self.confidence_level
+ t_critical = stats.t.ppf(1 - alpha/2, n - p - 1)
+ ci_lower = model.coef_ - t_critical * std_errors
+ ci_upper = model.coef_ + t_critical * std_errors
+ except:
+ # If all else fails, set defaults
+ std_errors = np.full(len(feature_cols), np.nan)
+ p_values = np.full(len(feature_cols), np.nan)
+ ci_lower = np.full(len(feature_cols), np.nan)
+ ci_upper = np.full(len(feature_cols), np.nan)
+
+ results['r_squared'] = r_squared
+ results['rmse'] = rmse
+ results['coefficients'] = coefficients
+ results['intercept'] = intercept
+ results['p_values'] = dict(zip(feature_cols, p_values))
+ results['confidence_intervals'] = {
+ col: (lower, upper) for col, lower, upper in
+ zip(feature_cols, ci_lower, ci_upper)
+ }
+ results['significant_features'] = [
+ col for col, p_val in zip(feature_cols, p_values)
+ if p_val < self.significance_level
+ ]
+
+ return results
+
+ def hypothesis_testing(self, df: pd.DataFrame) -> Dict:
+ """Perform hypothesis tests comparing complexity across modules."""
+ results = {}
+
+ if 'module' not in df.columns:
+ return results
+
+ # Test: Do different modules have significantly different complexity?
+ modules = df['module'].unique()
+ if len(modules) < 2:
+ return results
+
+ complexity_metrics = [
+ 'cyclomatic_complexity', 'cognitive_complexity',
+ 'avg_complexity', 'loc'
+ ]
+
+ for metric in complexity_metrics:
+ if metric not in df.columns:
+ continue
+
+ # Remove NaN values
+ data_by_module = [
+ df[df['module'] == module][metric].dropna().values
+ for module in modules
+ if len(df[df['module'] == module][metric].dropna()) > 0
+ ]
+
+ if len(data_by_module) < 2:
+ continue
+
+ # One-way ANOVA
+ try:
+ f_stat, p_value = stats.f_oneway(*data_by_module)
+ results[f'{metric}_anova'] = {
+ 'f_statistic': float(f_stat),
+ 'p_value': float(p_value),
+ 'significant': p_value < self.significance_level
+ }
+ except:
+ pass
+
+ # Kruskal-Wallis (non-parametric alternative)
+ try:
+ h_stat, p_value_kw = stats.kruskal(*data_by_module)
+ results[f'{metric}_kruskal_wallis'] = {
+ 'h_statistic': float(h_stat),
+ 'p_value': float(p_value_kw),
+ 'significant': p_value_kw < self.significance_level
+ }
+ except:
+ pass
+
+ return results
+
+ def t_test_analysis(self, df: pd.DataFrame) -> Dict:
+ """
+ Perform t-tests to compare complexity metrics between high-fix and low-fix files.
+ """
+ results = {}
+
+ if 'fix_count' not in df.columns:
+ return results
+
+ # Split files into high-fix and low-fix groups
+ median_fixes = df['fix_count'].median()
+ high_fix_mask = df['fix_count'] > median_fixes
+ low_fix_mask = df['fix_count'] <= median_fixes
+
+ complexity_metrics = [
+ 'loc', 'cyclomatic_complexity', 'cognitive_complexity',
+ 'avg_complexity', 'max_complexity', 'max_inheritance_depth'
+ ]
+
+ for metric in complexity_metrics:
+ if metric not in df.columns:
+ continue
+
+ high_fix_data = df.loc[high_fix_mask, metric].dropna()
+ low_fix_data = df.loc[low_fix_mask, metric].dropna()
+
+ if len(high_fix_data) < 2 or len(low_fix_data) < 2:
+ continue
+
+ # Independent samples t-test (assuming unequal variances)
+ try:
+ t_stat, p_value = stats.ttest_ind(high_fix_data, low_fix_data,
+ equal_var=False)
+ results[f'{metric}_t_test'] = {
+ 't_statistic': float(t_stat),
+ 'p_value': float(p_value),
+ 'significant': p_value < self.significance_level,
+ 'high_fix_mean': float(high_fix_data.mean()),
+ 'low_fix_mean': float(low_fix_data.mean()),
+ 'high_fix_std': float(high_fix_data.std()),
+ 'low_fix_std': float(low_fix_data.std()),
+ 'high_fix_n': len(high_fix_data),
+ 'low_fix_n': len(low_fix_data)
+ }
+ except Exception as e:
+ results[f'{metric}_t_test'] = {'error': str(e)}
+
+ return results
+
+ def z_test_analysis(self, df: pd.DataFrame) -> Dict:
+ """
+ Perform z-tests to compare complexity metrics between high-fix and low-fix files.
+ Z-test assumes known population variance (uses sample variance as approximation).
+ """
+ results = {}
+
+ if 'fix_count' not in df.columns:
+ return results
+
+ # Split files into high-fix and low-fix groups
+ median_fixes = df['fix_count'].median()
+ high_fix_mask = df['fix_count'] > median_fixes
+ low_fix_mask = df['fix_count'] <= median_fixes
+
+ complexity_metrics = [
+ 'loc', 'cyclomatic_complexity', 'cognitive_complexity',
+ 'avg_complexity', 'max_complexity', 'max_inheritance_depth'
+ ]
+
+ for metric in complexity_metrics:
+ if metric not in df.columns:
+ continue
+
+ high_fix_data = df.loc[high_fix_mask, metric].dropna()
+ low_fix_data = df.loc[low_fix_mask, metric].dropna()
+
+ if len(high_fix_data) < 30 or len(low_fix_data) < 30:
+ # Z-test requires large sample sizes (n >= 30)
+ continue
+
+ try:
+ # Calculate means and standard errors
+ mean1 = high_fix_data.mean()
+ mean2 = low_fix_data.mean()
+ std1 = high_fix_data.std()
+ std2 = low_fix_data.std()
+ n1 = len(high_fix_data)
+ n2 = len(low_fix_data)
+
+ # Standard error of the difference
+ se_diff = np.sqrt((std1**2 / n1) + (std2**2 / n2))
+
+ # Z-statistic
+ z_stat = (mean1 - mean2) / se_diff
+
+ # Two-tailed p-value
+ p_value = 2 * (1 - stats.norm.cdf(abs(z_stat)))
+
+ # Confidence interval for difference
+ alpha = 1 - self.confidence_level
+ z_critical = stats.norm.ppf(1 - alpha/2)
+ ci_lower = (mean1 - mean2) - z_critical * se_diff
+ ci_upper = (mean1 - mean2) + z_critical * se_diff
+
+ results[f'{metric}_z_test'] = {
+ 'z_statistic': float(z_stat),
+ 'p_value': float(p_value),
+ 'significant': p_value < self.significance_level,
+ 'high_fix_mean': float(mean1),
+ 'low_fix_mean': float(mean2),
+ 'mean_difference': float(mean1 - mean2),
+ 'ci_lower': float(ci_lower),
+ 'ci_upper': float(ci_upper),
+ 'high_fix_n': n1,
+ 'low_fix_n': n2
+ }
+ except Exception as e:
+ results[f'{metric}_z_test'] = {'error': str(e)}
+
+ return results
+
+ def confidence_intervals(self, df: pd.DataFrame,
+ metrics: List[str]) -> Dict:
+ """Calculate confidence intervals for various metrics."""
+ results = {}
+
+ alpha = 1 - self.confidence_level
+
+ for metric in metrics:
+ if metric not in df.columns:
+ continue
+
+ data = df[metric].dropna()
+ if len(data) < 2:
+ continue
+
+ # Calculate mean and standard error
+ mean = data.mean()
+ std_err = stats.sem(data)
+
+ # t-distribution confidence interval
+ t_critical = stats.t.ppf(1 - alpha/2, len(data) - 1)
+ ci_lower = mean - t_critical * std_err
+ ci_upper = mean + t_critical * std_err
+
+ results[metric] = {
+ 'mean': mean,
+ 'std': data.std(),
+ 'ci_lower': ci_lower,
+ 'ci_upper': ci_upper,
+ 'confidence_level': self.confidence_level
+ }
+
+ return results
+
+ def variance_covariance_analysis(self, df: pd.DataFrame) -> Dict:
+ """Calculate variance-covariance matrix for complexity metrics."""
+ results = {}
+
+ complexity_metrics = [
+ 'loc', 'cyclomatic_complexity', 'cognitive_complexity',
+ 'max_complexity', 'avg_complexity', 'max_inheritance_depth'
+ ]
+
+ metric_cols = [col for col in complexity_metrics if col in df.columns]
+
+ if len(metric_cols) < 2:
+ return results
+
+ # Remove rows with missing values
+ data = df[metric_cols].dropna()
+
+ if len(data) < 2:
+ return results
+
+ # Calculate covariance matrix
+ cov_matrix = data.cov()
+ corr_matrix = data.corr()
+
+ results['covariance_matrix'] = cov_matrix
+ results['correlation_matrix'] = corr_matrix
+ results['variances'] = data.var().to_dict()
+
+ return results
+
+ def pivot_table_analysis(self, df: pd.DataFrame) -> Dict:
+ """Create pivot tables for cross-tabulation analysis."""
+ results = {}
+
+ if 'module' not in df.columns:
+ return results
+
+ # Create complexity categories
+ if 'cyclomatic_complexity' in df.columns:
+ df['complexity_category'] = pd.cut(
+ df['cyclomatic_complexity'],
+ bins=[0, 10, 25, 50, float('inf')],
+ labels=['Low', 'Medium', 'High', 'Very High']
+ )
+
+ # Pivot: Module vs Complexity Category
+ pivot = pd.crosstab(df['module'], df['complexity_category'],
+ values=df['cyclomatic_complexity'],
+ aggfunc='mean')
+ results['module_complexity_pivot'] = pivot
+
+ # Pivot: Module vs Fix Count
+ if 'fix_count' in df.columns:
+ pivot_fixes = pd.crosstab(
+ df['module'],
+ pd.cut(df['fix_count'],
+ bins=[0, 1, 5, 10, float('inf')],
+ labels=['None', 'Low', 'Medium', 'High']),
+ values=df['fix_count'],
+ aggfunc='mean'
+ )
+ results['module_fixes_pivot'] = pivot_fixes
+
+ return results
+
+ def discrete_distribution_analysis(self, df: pd.DataFrame) -> Dict:
+ """Analyze discrete distributions of fix counts."""
+ results = {}
+
+ if 'fix_count' not in df.columns:
+ return results
+
+ issue_counts = df['fix_count'].dropna()
+
+ # Fit Poisson distribution
+ lambda_poisson = issue_counts.mean()
+ poisson_dist = stats.poisson(lambda_poisson)
+
+ # Chi-square goodness of fit test
+ observed_freq = issue_counts.value_counts().sort_index()
+ max_observed = int(observed_freq.index.max())
+
+ # Create bins for chi-square test
+ # Use bins that ensure expected frequency >= 5
+ observed_array = []
+ expected_array = []
+
+ # Start from 0 and go up to max_observed
+ for k in range(max_observed + 1):
+ obs_count = observed_freq.get(k, 0)
+ exp_count = poisson_dist.pmf(k) * len(issue_counts)
+
+ # Only include if expected frequency >= 5
+ if exp_count >= 5:
+ observed_array.append(obs_count)
+ expected_array.append(exp_count)
+
+ # If we have bins, perform the test
+ if len(observed_array) > 0 and len(expected_array) > 0:
+ # Normalize expected frequencies to match observed sum
+ observed_sum = sum(observed_array)
+ expected_sum = sum(expected_array)
+
+ if expected_sum > 0:
+ # Scale expected frequencies to match observed sum
+ expected_array = np.array(expected_array) * (observed_sum / expected_sum)
+ observed_array = np.array(observed_array)
+
+ # Ensure sums match (within tolerance)
+ if abs(sum(observed_array) - sum(expected_array)) < 1e-6:
+ try:
+ chi2_stat, p_value = stats.chisquare(
+ observed_array,
+ expected_array
+ )
+
+ results['poisson_fit'] = {
+ 'lambda': lambda_poisson,
+ 'chi2_statistic': float(chi2_stat),
+ 'p_value': float(p_value),
+ 'fits': p_value >= self.significance_level
+ }
+ except (ValueError, RuntimeError) as e:
+ # If chi-square test fails, skip it
+ results['poisson_fit'] = {
+ 'lambda': lambda_poisson,
+ 'chi2_statistic': None,
+ 'p_value': None,
+ 'fits': None,
+ 'error': str(e)
+ }
+
+ # Summary statistics
+ results['distribution_summary'] = {
+ 'mean': issue_counts.mean(),
+ 'variance': issue_counts.var(),
+ 'std': issue_counts.std(),
+ 'skewness': stats.skew(issue_counts),
+ 'kurtosis': stats.kurtosis(issue_counts)
+ }
+
+ return results
+
diff --git a/visualizer.py b/visualizer.py
new file mode 100644
index 0000000..6608037
--- /dev/null
+++ b/visualizer.py
@@ -0,0 +1,255 @@
+"""
+Visualization module for code metrics analysis results.
+"""
+import matplotlib.pyplot as plt
+import seaborn as sns
+import pandas as pd
+import numpy as np
+from pathlib import Path
+from typing import Dict, List, Optional
+import os
+
+
+class Visualizer:
+ """Creates visualizations for code metrics analysis."""
+
+ def __init__(self, output_dir: str = 'figures'):
+ self.output_dir = Path(output_dir)
+ self.output_dir.mkdir(exist_ok=True)
+ sns.set_style("whitegrid")
+ plt.rcParams['figure.figsize'] = (12, 8)
+
+ def plot_correlation_heatmap(self, df: pd.DataFrame,
+ complexity_cols: List[str],
+ issue_cols: List[str],
+ filename: str = 'correlation_heatmap.png'):
+ """Create correlation heatmap between complexity and issues."""
+ # Filter to existing columns
+ comp_cols = [col for col in complexity_cols if col in df.columns]
+ iss_cols = [col for col in issue_cols if col in df.columns]
+
+ if not comp_cols or not iss_cols:
+ return
+
+ # Calculate correlation matrix
+ corr_matrix = np.full((len(comp_cols), len(iss_cols)), np.nan)
+
+ for i, comp_col in enumerate(comp_cols):
+ for j, iss_col in enumerate(iss_cols):
+ mask = df[[comp_col, iss_col]].notna().all(axis=1)
+ if mask.sum() >= 3:
+ corr = df.loc[mask, comp_col].corr(df.loc[mask, iss_col], method='pearson')
+ if not np.isnan(corr):
+ corr_matrix[i, j] = corr
+
+ # Check if we have any valid correlations
+ if np.isnan(corr_matrix).all():
+ return
+
+ # Create heatmap
+ fig, ax = plt.subplots(figsize=(14, 10))
+ sns.heatmap(corr_matrix, annot=True, fmt='.3f', cmap='coolwarm',
+ center=0, vmin=-1, vmax=1, ax=ax,
+ xticklabels=iss_cols, yticklabels=comp_cols,
+ mask=np.isnan(corr_matrix))
+ ax.set_title('Correlation Heatmap: Complexity Metrics vs Issue Metrics',
+ fontsize=16, fontweight='bold')
+ plt.tight_layout()
+ plt.savefig(self.output_dir / filename, dpi=300, bbox_inches='tight')
+ plt.close()
+
+ def plot_complexity_vs_issues_scatter(self, df: pd.DataFrame,
+ complexity_col: str,
+ issue_col: str,
+ filename: Optional[str] = None):
+ """Create scatter plot of complexity vs issues."""
+ if complexity_col not in df.columns or issue_col not in df.columns:
+ return
+
+ mask = df[[complexity_col, issue_col]].notna().all(axis=1)
+ if mask.sum() < 3:
+ return
+
+ fig, ax = plt.subplots(figsize=(10, 6))
+
+ x = df.loc[mask, complexity_col]
+ y = df.loc[mask, issue_col]
+
+ # Remove any infinite or NaN values
+ valid_mask = np.isfinite(x) & np.isfinite(y)
+ x = x[valid_mask]
+ y = y[valid_mask]
+
+ if len(x) < 2:
+ return
+
+ ax.scatter(x, y, alpha=0.5, s=50)
+
+ # Add regression line (with error handling)
+ try:
+ # Check if x has variance (not constant)
+ if x.std() > 1e-10:
+ z = np.polyfit(x, y, 1)
+ p = np.poly1d(z)
+ x_sorted = np.sort(x)
+ ax.plot(x_sorted, p(x_sorted), "r--", alpha=0.8, linewidth=2,
+ label=f'Trend line (slope={z[0]:.3f})')
+ except (np.linalg.LinAlgError, ValueError, RuntimeError):
+ # Skip regression line if fitting fails
+ pass
+
+ ax.set_xlabel(complexity_col.replace('_', ' ').title(), fontsize=12)
+ ax.set_ylabel(issue_col.replace('_', ' ').title(), fontsize=12)
+ ax.set_title(f'{complexity_col.replace("_", " ").title()} vs '
+ f'{issue_col.replace("_", " ").title()}',
+ fontsize=14, fontweight='bold')
+ ax.legend()
+ ax.grid(True, alpha=0.3)
+
+ plt.tight_layout()
+ if filename:
+ plt.savefig(self.output_dir / filename, dpi=300, bbox_inches='tight')
+ plt.close()
+
+ def plot_module_complexity_comparison(self, df: pd.DataFrame,
+ complexity_col: str,
+ filename: str = 'module_complexity_comparison.png'):
+ """Compare complexity across different modules."""
+ if 'module' not in df.columns or complexity_col not in df.columns:
+ return
+
+ mask = df[complexity_col].notna()
+ if mask.sum() == 0:
+ return
+
+ fig, axes = plt.subplots(2, 1, figsize=(14, 10))
+
+ # Box plot
+ modules = df.loc[mask, 'module'].unique()
+ data_by_module = [df.loc[mask & (df['module'] == mod), complexity_col].values
+ for mod in modules]
+
+ axes[0].boxplot(data_by_module, labels=modules)
+ axes[0].set_ylabel(complexity_col.replace('_', ' ').title(), fontsize=12)
+ axes[0].set_title(f'{complexity_col.replace("_", " ").title()} by Module',
+ fontsize=14, fontweight='bold')
+ axes[0].tick_params(axis='x', rotation=45)
+ axes[0].grid(True, alpha=0.3)
+
+ # Bar plot of means
+ module_means = df.loc[mask].groupby('module')[complexity_col].mean().sort_values(ascending=False)
+ axes[1].bar(range(len(module_means)), module_means.values)
+ axes[1].set_xticks(range(len(module_means)))
+ axes[1].set_xticklabels(module_means.index, rotation=45, ha='right')
+ axes[1].set_ylabel(f'Mean {complexity_col.replace("_", " ").title()}', fontsize=12)
+ axes[1].set_title(f'Average {complexity_col.replace("_", " ").title()} by Module',
+ fontsize=14, fontweight='bold')
+ axes[1].grid(True, alpha=0.3, axis='y')
+
+ plt.tight_layout()
+ plt.savefig(self.output_dir / filename, dpi=300, bbox_inches='tight')
+ plt.close()
+
+ def plot_distribution_analysis(self, df: pd.DataFrame,
+ metric_col: str,
+ filename: str = 'distribution_analysis.png'):
+ """Plot distribution analysis for a metric."""
+ if metric_col not in df.columns:
+ return
+
+ data = df[metric_col].dropna()
+ if len(data) == 0:
+ return
+
+ fig, axes = plt.subplots(2, 2, figsize=(14, 10))
+
+ # Histogram
+ axes[0, 0].hist(data, bins=30, edgecolor='black', alpha=0.7)
+ axes[0, 0].set_xlabel(metric_col.replace('_', ' ').title(), fontsize=12)
+ axes[0, 0].set_ylabel('Frequency', fontsize=12)
+ axes[0, 0].set_title('Histogram', fontsize=12, fontweight='bold')
+ axes[0, 0].grid(True, alpha=0.3)
+
+ # Q-Q plot
+ from scipy import stats
+ stats.probplot(data, dist="norm", plot=axes[0, 1])
+ axes[0, 1].set_title('Q-Q Plot (Normal Distribution)', fontsize=12, fontweight='bold')
+ axes[0, 1].grid(True, alpha=0.3)
+
+ # Box plot
+ axes[1, 0].boxplot(data, vert=True)
+ axes[1, 0].set_ylabel(metric_col.replace('_', ' ').title(), fontsize=12)
+ axes[1, 0].set_title('Box Plot', fontsize=12, fontweight='bold')
+ axes[1, 0].grid(True, alpha=0.3)
+
+ # Cumulative distribution
+ sorted_data = np.sort(data)
+ cumulative = np.arange(1, len(sorted_data) + 1) / len(sorted_data)
+ axes[1, 1].plot(sorted_data, cumulative, linewidth=2)
+ axes[1, 1].set_xlabel(metric_col.replace('_', ' ').title(), fontsize=12)
+ axes[1, 1].set_ylabel('Cumulative Probability', fontsize=12)
+ axes[1, 1].set_title('Cumulative Distribution Function', fontsize=12, fontweight='bold')
+ axes[1, 1].grid(True, alpha=0.3)
+
+ plt.suptitle(f'Distribution Analysis: {metric_col.replace("_", " ").title()}',
+ fontsize=16, fontweight='bold')
+ plt.tight_layout()
+ plt.savefig(self.output_dir / filename, dpi=300, bbox_inches='tight')
+ plt.close()
+
+ def plot_regression_results(self, regression_results: Dict,
+ filename: str = 'regression_results.png'):
+ """Visualize regression analysis results."""
+ if 'coefficients' not in regression_results:
+ return
+
+ coefficients = regression_results['coefficients']
+ p_values = regression_results.get('p_values', {})
+ ci = regression_results.get('confidence_intervals', {})
+
+ if not coefficients:
+ return
+
+ fig, axes = plt.subplots(1, 2, figsize=(14, 6))
+
+ # Coefficient plot with confidence intervals
+ features = list(coefficients.keys())
+ coef_values = list(coefficients.values())
+ colors = ['red' if p_values.get(f, 1) < 0.05 else 'gray'
+ for f in features]
+
+ y_pos = np.arange(len(features))
+ axes[0].barh(y_pos, coef_values, color=colors, alpha=0.7)
+
+ # Add confidence intervals
+ for i, feature in enumerate(features):
+ if feature in ci:
+ ci_lower, ci_upper = ci[feature]
+ axes[0].plot([ci_lower, ci_upper], [i, i], 'k-', linewidth=2)
+
+ axes[0].set_yticks(y_pos)
+ axes[0].set_yticklabels(features)
+ axes[0].set_xlabel('Coefficient Value', fontsize=12)
+ axes[0].set_title('Regression Coefficients with 95% CI', fontsize=14, fontweight='bold')
+ axes[0].axvline(x=0, color='black', linestyle='--', linewidth=1)
+ axes[0].grid(True, alpha=0.3)
+
+ # P-values plot
+ p_vals = [p_values.get(f, 1) for f in features]
+ axes[1].barh(y_pos, p_vals, color=colors, alpha=0.7)
+ axes[1].axvline(x=0.05, color='red', linestyle='--', linewidth=2,
+ label='Significance Level (0.05)')
+ axes[1].set_yticks(y_pos)
+ axes[1].set_yticklabels(features)
+ axes[1].set_xlabel('P-value', fontsize=12)
+ axes[1].set_title('P-values for Coefficients', fontsize=14, fontweight='bold')
+ axes[1].set_xlim([0, max(p_vals) * 1.1])
+ axes[1].legend()
+ axes[1].grid(True, alpha=0.3)
+
+ plt.suptitle(f'Regression Analysis Results (R² = {regression_results.get("r_squared", 0):.3f})',
+ fontsize=16, fontweight='bold')
+ plt.tight_layout()
+ plt.savefig(self.output_dir / filename, dpi=300, bbox_inches='tight')
+ plt.close()
+