""" Main analysis script for code metrics analysis project. Orchestrates data collection, analysis, and visualization. """ import pandas as pd import numpy as np import json from pathlib import Path from typing import Optional, Dict from concurrent.futures import ThreadPoolExecutor, as_completed from github_client import GitHubClient from code_analyzer import CodeAnalyzer from data_collector import DataCollector from statistical_analysis import StatisticalAnalyzer from visualizer import Visualizer from config import ( MAX_REPOSITORIES, MIN_STARS, EXCLUDE_DIRS, OUTPUT_DIR, FIGURES_DIR, SIGNIFICANCE_LEVEL, CONFIDENCE_LEVEL, CURATED_REPOSITORIES, MAX_WORKERS, PARALLEL_REPOS, PARALLEL_FILES, USE_EXISTING_METRICS, RAW_METRICS_FILE, FOCUSED_MODE ) def main(): """Main analysis pipeline.""" print("=" * 80) print("Code Metrics Analysis Pipeline") print("=" * 80) # Create output directories Path(OUTPUT_DIR).mkdir(exist_ok=True) Path(FIGURES_DIR).mkdir(exist_ok=True) # Initialize components print("\n1. Initializing components...") stat_analyzer = StatisticalAnalyzer( significance_level=SIGNIFICANCE_LEVEL, confidence_level=CONFIDENCE_LEVEL ) visualizer = Visualizer(output_dir=FIGURES_DIR) # Check if we should use existing metrics raw_metrics_path = Path(RAW_METRICS_FILE) use_existing = USE_EXISTING_METRICS and raw_metrics_path.exists() if use_existing: print(f"\n2. Loading existing raw metrics from {RAW_METRICS_FILE}...") try: df = pd.read_csv(raw_metrics_path) print(f" ✓ Loaded {len(df)} file metrics from existing data") all_metrics = df.to_dict('records') repo_summaries = [] # We don't have repo summaries from CSV except Exception as e: print(f" ✗ Error loading existing metrics: {e}") print(" Falling back to data collection...") use_existing = False if not use_existing: # Initialize data collection components github_client = GitHubClient() code_analyzer = CodeAnalyzer(exclude_dirs=EXCLUDE_DIRS) data_collector = DataCollector(code_analyzer) # Use curated list of popular Python repositories print("\n2. Using curated list of popular Python repositories...") repositories_to_analyze = CURATED_REPOSITORIES[:MAX_REPOSITORIES] print(f"Selected {len(repositories_to_analyze)} repositories:") for owner, repo_name in repositories_to_analyze: print(f" - {owner}/{repo_name}") # Collect data from repositories (parallelized) print("\n3. Collecting data from repositories...") all_metrics = [] repo_summaries = [] def process_repository(owner: str, repo_name: str) -> Optional[Dict]: """Process a single repository and return results.""" try: print(f" Processing {owner}/{repo_name}...") # Get repository info for stars count try: repo_info = github_client.get_repository_info(owner, repo_name) stars = repo_info.get('stargazers_count', 0) except: stars = 0 repo_data = data_collector.collect_repository_data( owner, repo_name, parallel_files=PARALLEL_FILES, max_workers=MAX_WORKERS ) if repo_data and repo_data['code_metrics']: print(f" ✓ {owner}/{repo_name}: {len(repo_data['code_metrics'])} files, {repo_data['total_fixes']} fixes") return { 'metrics': repo_data['code_metrics'], 'summary': { 'owner': owner, 'repo': repo_name, 'files_analyzed': len(repo_data['code_metrics']), 'total_fixes': repo_data['total_fixes'], 'stars': stars } } else: print(f" ✗ {owner}/{repo_name}: No data collected") return None except Exception as e: print(f" ✗ {owner}/{repo_name}: Error - {e}") import traceback traceback.print_exc() return None if PARALLEL_REPOS and len(repositories_to_analyze) > 1: # Process repositories in parallel print(f" Processing {len(repositories_to_analyze)} repositories in parallel...") with ThreadPoolExecutor(max_workers=min(len(repositories_to_analyze), MAX_WORKERS or 4)) as executor: futures = {executor.submit(process_repository, owner, repo_name): (owner, repo_name) for owner, repo_name in repositories_to_analyze} for future in as_completed(futures): owner, repo_name = futures[future] try: result = future.result() if result: all_metrics.extend(result['metrics']) repo_summaries.append(result['summary']) except Exception as e: print(f" ✗ {owner}/{repo_name}: Failed - {e}") else: # Sequential processing for i, (owner, repo_name) in enumerate(repositories_to_analyze, 1): print(f"\n[{i}/{len(repositories_to_analyze)}] Processing {owner}/{repo_name}...") result = process_repository(owner, repo_name) if result: all_metrics.extend(result['metrics']) repo_summaries.append(result['summary']) if not all_metrics: print("\nNo metrics collected. Exiting.") return # Convert to DataFrame print("\n4. Preparing data for analysis...") df = stat_analyzer.prepare_dataframe(all_metrics) print(f"Total files analyzed: {len(df)}") print(f"Columns: {list(df.columns)}") # Save raw data df.to_csv(Path(OUTPUT_DIR) / 'raw_metrics.csv', index=False) print(f" ✓ Saved raw metrics to {OUTPUT_DIR}/raw_metrics.csv") else: # Already have DataFrame from CSV print("\n3. Preparing data for analysis...") df = stat_analyzer.prepare_dataframe(all_metrics) print(f"Total files analyzed: {len(df)}") print(f"Columns: {list(df.columns)}") if len(df) == 0: print("\nNo metrics available for analysis. Exiting.") return # Statistical Analysis step_num = "4" if use_existing else "5" print(f"\n{step_num}. Performing statistical analysis...") if FOCUSED_MODE: print(" [FOCUSED MODE: Regression and Hypothesis Testing Only]") # Regression analysis print(" - Regression analysis...") complexity_features = [ 'loc', 'cyclomatic_complexity', 'cognitive_complexity', 'max_complexity', 'avg_complexity', 'max_inheritance_depth' ] regression_results = stat_analyzer.regression_analysis( df, complexity_features, target='fix_count' ) # Hypothesis testing - t-tests print(" - T-test analysis...") t_test_results = stat_analyzer.t_test_analysis(df) # Hypothesis testing - z-tests print(" - Z-test analysis...") z_test_results = stat_analyzer.z_test_analysis(df) # Hypothesis testing - module comparisons (ANOVA) print(" - Hypothesis testing (ANOVA)...") hypothesis_results = stat_analyzer.hypothesis_testing(df) # Confidence intervals for key metrics print(" - Confidence intervals...") ci_results = stat_analyzer.confidence_intervals( df, ['cyclomatic_complexity', 'cognitive_complexity', 'loc', 'fix_count'] ) # Initialize empty results for skipped analyses correlation_results = {} var_cov_results = {} pivot_results = {} distribution_results = {} else: # Full analysis mode # Correlation analysis print(" - Correlation analysis...") correlation_results = stat_analyzer.correlation_analysis(df) # Regression analysis print(" - Regression analysis...") complexity_features = [ 'loc', 'cyclomatic_complexity', 'cognitive_complexity', 'max_complexity', 'avg_complexity', 'max_inheritance_depth' ] regression_results = stat_analyzer.regression_analysis( df, complexity_features, target='fix_count' ) # Hypothesis testing print(" - Hypothesis testing...") hypothesis_results = stat_analyzer.hypothesis_testing(df) # T-test analysis print(" - T-test analysis...") t_test_results = stat_analyzer.t_test_analysis(df) # Z-test analysis print(" - Z-test analysis...") z_test_results = stat_analyzer.z_test_analysis(df) # Confidence intervals print(" - Confidence intervals...") ci_results = stat_analyzer.confidence_intervals( df, ['cyclomatic_complexity', 'cognitive_complexity', 'loc', 'fix_count'] ) # Variance-covariance analysis print(" - Variance-covariance analysis...") var_cov_results = stat_analyzer.variance_covariance_analysis(df) # Pivot table analysis print(" - Pivot table analysis...") pivot_results = stat_analyzer.pivot_table_analysis(df) # Discrete distribution analysis print(" - Discrete distribution analysis...") distribution_results = stat_analyzer.discrete_distribution_analysis(df) # Save analysis results step_num = "5" if use_existing else "6" print(f"\n{step_num}. Saving analysis results...") results = { 'correlation_analysis': correlation_results, 'regression_analysis': regression_results, 'hypothesis_testing': hypothesis_results, 't_test_analysis': t_test_results if 't_test_results' in locals() else {}, 'z_test_analysis': z_test_results if 'z_test_results' in locals() else {}, 'confidence_intervals': ci_results, 'variance_covariance': var_cov_results, 'pivot_tables': {k: v.to_dict() if isinstance(v, pd.DataFrame) else v for k, v in pivot_results.items()}, 'distribution_analysis': distribution_results, 'repository_summaries': repo_summaries, 'analysis_mode': 'focused' if FOCUSED_MODE else 'full' } # Convert numpy types to native Python types for JSON serialization def convert_to_serializable(obj): if isinstance(obj, (np.integer, np.floating)): return float(obj) elif isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, dict): return {k: convert_to_serializable(v) for k, v in obj.items()} elif isinstance(obj, list): return [convert_to_serializable(item) for item in obj] elif isinstance(obj, pd.DataFrame): return obj.to_dict() return obj results_serializable = convert_to_serializable(results) with open(Path(OUTPUT_DIR) / 'analysis_results.json', 'w') as f: json.dump(results_serializable, f, indent=2, default=str) print(f" ✓ Saved analysis results to {OUTPUT_DIR}/analysis_results.json") # Create visualizations print("\n" + ("6" if use_existing else "7") + ". Creating visualizations...") if FOCUSED_MODE: print(" [FOCUSED MODE: Regression visualizations only]") # Scatter plots for key relationships print(" - Scatter plots...") visualizer.plot_complexity_vs_issues_scatter( df, 'loc', 'fix_count', 'loc_vs_fixes.png' ) visualizer.plot_complexity_vs_issues_scatter( df, 'cognitive_complexity', 'fix_count', 'cognitive_complexity_vs_fixes.png' ) # Regression results if regression_results: print(" - Regression results...") visualizer.plot_regression_results(regression_results) else: # Full visualization mode # Correlation heatmap print(" - Correlation heatmap...") visualizer.plot_correlation_heatmap( df, complexity_cols=['loc', 'cyclomatic_complexity', 'cognitive_complexity', 'max_complexity', 'avg_complexity'], issue_cols=['fix_count', 'total_fixes'] ) # Scatter plots print(" - Scatter plots...") visualizer.plot_complexity_vs_issues_scatter( df, 'cyclomatic_complexity', 'fix_count', 'cyclomatic_complexity_vs_fixes.png' ) visualizer.plot_complexity_vs_issues_scatter( df, 'cognitive_complexity', 'fix_count', 'cognitive_complexity_vs_fixes.png' ) visualizer.plot_complexity_vs_issues_scatter( df, 'loc', 'fix_count', 'loc_vs_fixes.png' ) # Module comparison print(" - Module comparison...") visualizer.plot_module_complexity_comparison( df, 'cyclomatic_complexity' ) # Distribution analysis print(" - Distribution analysis...") visualizer.plot_distribution_analysis(df, 'cyclomatic_complexity') visualizer.plot_distribution_analysis(df, 'fix_count', 'fix_distribution.png') # Regression results if regression_results: print(" - Regression results...") visualizer.plot_regression_results(regression_results) print(f" ✓ Saved visualizations to {FIGURES_DIR}/") # Print summary statistics step_num = "7" if use_existing else "8" print(f"\n{step_num}. Summary Statistics") print("=" * 80) print(f"\nTotal files analyzed: {len(df)}") if repo_summaries: print(f"Total repositories: {len(repo_summaries)}") print("\nComplexity Metrics (Mean ± Std):") for metric in ['loc', 'cyclomatic_complexity', 'cognitive_complexity', 'max_complexity', 'avg_complexity']: if metric in df.columns: mean_val = df[metric].mean() std_val = df[metric].std() print(f" {metric:30s}: {mean_val:8.2f} ± {std_val:8.2f}") print("\nFix Metrics (Mean ± Std):") for metric in ['fix_count', 'total_fixes']: if metric in df.columns: mean_val = df[metric].mean() std_val = df[metric].std() print(f" {metric:30s}: {mean_val:8.2f} ± {std_val:8.2f}") if not FOCUSED_MODE: print("\nSignificant Correlations:") sig_corr = correlation_results.get('significant_correlations', {}) if sig_corr: for metric, corr_value in list(sig_corr.items())[:10]: print(f" {metric}: {corr_value:.3f}") else: print(" None found") if regression_results: print(f"\nRegression Analysis:") print(f" R²: {regression_results.get('r_squared', 0):.3f}") print(f" RMSE: {regression_results.get('rmse', 0):.3f}") if regression_results.get('significant_features'): print(f" Significant features: {regression_results['significant_features']}") # T-test results if 't_test_results' in locals() and t_test_results: print(f"\nT-Test Results (High-fix vs Low-fix files):") for metric, result in t_test_results.items(): if 'error' not in result: sig_marker = "***" if result.get('significant') else "" print(f" {metric}:") print(f" t-statistic: {result.get('t_statistic', 0):.3f}") print(f" p-value: {result.get('p_value', 1):.4f} {sig_marker}") print(f" High-fix mean: {result.get('high_fix_mean', 0):.2f}") print(f" Low-fix mean: {result.get('low_fix_mean', 0):.2f}") # Z-test results if 'z_test_results' in locals() and z_test_results: print(f"\nZ-Test Results (High-fix vs Low-fix files):") for metric, result in z_test_results.items(): if 'error' not in result: sig_marker = "***" if result.get('significant') else "" print(f" {metric}:") print(f" z-statistic: {result.get('z_statistic', 0):.3f}") print(f" p-value: {result.get('p_value', 1):.4f} {sig_marker}") print(f" Mean difference: {result.get('mean_difference', 0):.2f}") print(f" 95% CI: [{result.get('ci_lower', 0):.2f}, {result.get('ci_upper', 0):.2f}]") # Hypothesis testing results if hypothesis_results: print(f"\nHypothesis Testing (ANOVA/Kruskal-Wallis):") for test_name, result in list(hypothesis_results.items())[:5]: if isinstance(result, dict) and 'p_value' in result: sig_marker = "***" if result.get('significant') else "" print(f" {test_name}: p={result.get('p_value', 1):.4f} {sig_marker}") print("\n" + "=" * 80) print("Analysis complete!") print(f"Results saved to: {OUTPUT_DIR}/") print(f"Figures saved to: {FIGURES_DIR}/") print("=" * 80) # Cleanup (only if we collected data) if not use_existing: data_collector.cleanup() if __name__ == '__main__': main()