diff options
Diffstat (limited to 'main.py')
| -rw-r--r-- | main.py | 438 |
1 files changed, 438 insertions, 0 deletions
@@ -0,0 +1,438 @@ +""" +Main analysis script for code metrics analysis project. +Orchestrates data collection, analysis, and visualization. +""" +import pandas as pd +import numpy as np +import json +from pathlib import Path +from typing import Optional, Dict +from concurrent.futures import ThreadPoolExecutor, as_completed +from github_client import GitHubClient +from code_analyzer import CodeAnalyzer +from data_collector import DataCollector +from statistical_analysis import StatisticalAnalyzer +from visualizer import Visualizer +from config import ( + MAX_REPOSITORIES, MIN_STARS, EXCLUDE_DIRS, + OUTPUT_DIR, FIGURES_DIR, SIGNIFICANCE_LEVEL, CONFIDENCE_LEVEL, + CURATED_REPOSITORIES, MAX_WORKERS, PARALLEL_REPOS, PARALLEL_FILES, + USE_EXISTING_METRICS, RAW_METRICS_FILE, FOCUSED_MODE +) + + +def main(): + """Main analysis pipeline.""" + print("=" * 80) + print("Code Metrics Analysis Pipeline") + print("=" * 80) + + # Create output directories + Path(OUTPUT_DIR).mkdir(exist_ok=True) + Path(FIGURES_DIR).mkdir(exist_ok=True) + + # Initialize components + print("\n1. Initializing components...") + stat_analyzer = StatisticalAnalyzer( + significance_level=SIGNIFICANCE_LEVEL, + confidence_level=CONFIDENCE_LEVEL + ) + visualizer = Visualizer(output_dir=FIGURES_DIR) + + # Check if we should use existing metrics + raw_metrics_path = Path(RAW_METRICS_FILE) + use_existing = USE_EXISTING_METRICS and raw_metrics_path.exists() + + if use_existing: + print(f"\n2. Loading existing raw metrics from {RAW_METRICS_FILE}...") + try: + df = pd.read_csv(raw_metrics_path) + print(f" ✓ Loaded {len(df)} file metrics from existing data") + all_metrics = df.to_dict('records') + repo_summaries = [] # We don't have repo summaries from CSV + except Exception as e: + print(f" ✗ Error loading existing metrics: {e}") + print(" Falling back to data collection...") + use_existing = False + + if not use_existing: + # Initialize data collection components + github_client = GitHubClient() + code_analyzer = CodeAnalyzer(exclude_dirs=EXCLUDE_DIRS) + data_collector = DataCollector(code_analyzer) + + # Use curated list of popular Python repositories + print("\n2. Using curated list of popular Python repositories...") + repositories_to_analyze = CURATED_REPOSITORIES[:MAX_REPOSITORIES] + print(f"Selected {len(repositories_to_analyze)} repositories:") + for owner, repo_name in repositories_to_analyze: + print(f" - {owner}/{repo_name}") + + # Collect data from repositories (parallelized) + print("\n3. Collecting data from repositories...") + all_metrics = [] + repo_summaries = [] + + def process_repository(owner: str, repo_name: str) -> Optional[Dict]: + """Process a single repository and return results.""" + try: + print(f" Processing {owner}/{repo_name}...") + + # Get repository info for stars count + try: + repo_info = github_client.get_repository_info(owner, repo_name) + stars = repo_info.get('stargazers_count', 0) + except: + stars = 0 + + repo_data = data_collector.collect_repository_data( + owner, repo_name, + parallel_files=PARALLEL_FILES, + max_workers=MAX_WORKERS + ) + + if repo_data and repo_data['code_metrics']: + print(f" ✓ {owner}/{repo_name}: {len(repo_data['code_metrics'])} files, {repo_data['total_fixes']} fixes") + return { + 'metrics': repo_data['code_metrics'], + 'summary': { + 'owner': owner, + 'repo': repo_name, + 'files_analyzed': len(repo_data['code_metrics']), + 'total_fixes': repo_data['total_fixes'], + 'stars': stars + } + } + else: + print(f" ✗ {owner}/{repo_name}: No data collected") + return None + except Exception as e: + print(f" ✗ {owner}/{repo_name}: Error - {e}") + import traceback + traceback.print_exc() + return None + + if PARALLEL_REPOS and len(repositories_to_analyze) > 1: + # Process repositories in parallel + print(f" Processing {len(repositories_to_analyze)} repositories in parallel...") + with ThreadPoolExecutor(max_workers=min(len(repositories_to_analyze), MAX_WORKERS or 4)) as executor: + futures = {executor.submit(process_repository, owner, repo_name): (owner, repo_name) + for owner, repo_name in repositories_to_analyze} + + for future in as_completed(futures): + owner, repo_name = futures[future] + try: + result = future.result() + if result: + all_metrics.extend(result['metrics']) + repo_summaries.append(result['summary']) + except Exception as e: + print(f" ✗ {owner}/{repo_name}: Failed - {e}") + else: + # Sequential processing + for i, (owner, repo_name) in enumerate(repositories_to_analyze, 1): + print(f"\n[{i}/{len(repositories_to_analyze)}] Processing {owner}/{repo_name}...") + result = process_repository(owner, repo_name) + if result: + all_metrics.extend(result['metrics']) + repo_summaries.append(result['summary']) + + if not all_metrics: + print("\nNo metrics collected. Exiting.") + return + + # Convert to DataFrame + print("\n4. Preparing data for analysis...") + df = stat_analyzer.prepare_dataframe(all_metrics) + print(f"Total files analyzed: {len(df)}") + print(f"Columns: {list(df.columns)}") + + # Save raw data + df.to_csv(Path(OUTPUT_DIR) / 'raw_metrics.csv', index=False) + print(f" ✓ Saved raw metrics to {OUTPUT_DIR}/raw_metrics.csv") + else: + # Already have DataFrame from CSV + print("\n3. Preparing data for analysis...") + df = stat_analyzer.prepare_dataframe(all_metrics) + print(f"Total files analyzed: {len(df)}") + print(f"Columns: {list(df.columns)}") + + if len(df) == 0: + print("\nNo metrics available for analysis. Exiting.") + return + + # Statistical Analysis + step_num = "4" if use_existing else "5" + print(f"\n{step_num}. Performing statistical analysis...") + + if FOCUSED_MODE: + print(" [FOCUSED MODE: Regression and Hypothesis Testing Only]") + + # Regression analysis + print(" - Regression analysis...") + complexity_features = [ + 'loc', 'cyclomatic_complexity', 'cognitive_complexity', + 'max_complexity', 'avg_complexity', 'max_inheritance_depth' + ] + regression_results = stat_analyzer.regression_analysis( + df, complexity_features, target='fix_count' + ) + + # Hypothesis testing - t-tests + print(" - T-test analysis...") + t_test_results = stat_analyzer.t_test_analysis(df) + + # Hypothesis testing - z-tests + print(" - Z-test analysis...") + z_test_results = stat_analyzer.z_test_analysis(df) + + # Hypothesis testing - module comparisons (ANOVA) + print(" - Hypothesis testing (ANOVA)...") + hypothesis_results = stat_analyzer.hypothesis_testing(df) + + # Confidence intervals for key metrics + print(" - Confidence intervals...") + ci_results = stat_analyzer.confidence_intervals( + df, ['cyclomatic_complexity', 'cognitive_complexity', 'loc', 'fix_count'] + ) + + # Initialize empty results for skipped analyses + correlation_results = {} + var_cov_results = {} + pivot_results = {} + distribution_results = {} + else: + # Full analysis mode + # Correlation analysis + print(" - Correlation analysis...") + correlation_results = stat_analyzer.correlation_analysis(df) + + # Regression analysis + print(" - Regression analysis...") + complexity_features = [ + 'loc', 'cyclomatic_complexity', 'cognitive_complexity', + 'max_complexity', 'avg_complexity', 'max_inheritance_depth' + ] + regression_results = stat_analyzer.regression_analysis( + df, complexity_features, target='fix_count' + ) + + # Hypothesis testing + print(" - Hypothesis testing...") + hypothesis_results = stat_analyzer.hypothesis_testing(df) + + # T-test analysis + print(" - T-test analysis...") + t_test_results = stat_analyzer.t_test_analysis(df) + + # Z-test analysis + print(" - Z-test analysis...") + z_test_results = stat_analyzer.z_test_analysis(df) + + # Confidence intervals + print(" - Confidence intervals...") + ci_results = stat_analyzer.confidence_intervals( + df, ['cyclomatic_complexity', 'cognitive_complexity', 'loc', 'fix_count'] + ) + + # Variance-covariance analysis + print(" - Variance-covariance analysis...") + var_cov_results = stat_analyzer.variance_covariance_analysis(df) + + # Pivot table analysis + print(" - Pivot table analysis...") + pivot_results = stat_analyzer.pivot_table_analysis(df) + + # Discrete distribution analysis + print(" - Discrete distribution analysis...") + distribution_results = stat_analyzer.discrete_distribution_analysis(df) + + # Save analysis results + step_num = "5" if use_existing else "6" + print(f"\n{step_num}. Saving analysis results...") + results = { + 'correlation_analysis': correlation_results, + 'regression_analysis': regression_results, + 'hypothesis_testing': hypothesis_results, + 't_test_analysis': t_test_results if 't_test_results' in locals() else {}, + 'z_test_analysis': z_test_results if 'z_test_results' in locals() else {}, + 'confidence_intervals': ci_results, + 'variance_covariance': var_cov_results, + 'pivot_tables': {k: v.to_dict() if isinstance(v, pd.DataFrame) else v + for k, v in pivot_results.items()}, + 'distribution_analysis': distribution_results, + 'repository_summaries': repo_summaries, + 'analysis_mode': 'focused' if FOCUSED_MODE else 'full' + } + + # Convert numpy types to native Python types for JSON serialization + def convert_to_serializable(obj): + if isinstance(obj, (np.integer, np.floating)): + return float(obj) + elif isinstance(obj, np.ndarray): + return obj.tolist() + elif isinstance(obj, dict): + return {k: convert_to_serializable(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [convert_to_serializable(item) for item in obj] + elif isinstance(obj, pd.DataFrame): + return obj.to_dict() + return obj + + results_serializable = convert_to_serializable(results) + + with open(Path(OUTPUT_DIR) / 'analysis_results.json', 'w') as f: + json.dump(results_serializable, f, indent=2, default=str) + print(f" ✓ Saved analysis results to {OUTPUT_DIR}/analysis_results.json") + + # Create visualizations + print("\n" + ("6" if use_existing else "7") + ". Creating visualizations...") + + if FOCUSED_MODE: + print(" [FOCUSED MODE: Regression visualizations only]") + + # Scatter plots for key relationships + print(" - Scatter plots...") + visualizer.plot_complexity_vs_issues_scatter( + df, 'loc', 'fix_count', + 'loc_vs_fixes.png' + ) + visualizer.plot_complexity_vs_issues_scatter( + df, 'cognitive_complexity', 'fix_count', + 'cognitive_complexity_vs_fixes.png' + ) + + # Regression results + if regression_results: + print(" - Regression results...") + visualizer.plot_regression_results(regression_results) + else: + # Full visualization mode + # Correlation heatmap + print(" - Correlation heatmap...") + visualizer.plot_correlation_heatmap( + df, + complexity_cols=['loc', 'cyclomatic_complexity', 'cognitive_complexity', + 'max_complexity', 'avg_complexity'], + issue_cols=['fix_count', 'total_fixes'] + ) + + # Scatter plots + print(" - Scatter plots...") + visualizer.plot_complexity_vs_issues_scatter( + df, 'cyclomatic_complexity', 'fix_count', + 'cyclomatic_complexity_vs_fixes.png' + ) + visualizer.plot_complexity_vs_issues_scatter( + df, 'cognitive_complexity', 'fix_count', + 'cognitive_complexity_vs_fixes.png' + ) + visualizer.plot_complexity_vs_issues_scatter( + df, 'loc', 'fix_count', + 'loc_vs_fixes.png' + ) + + # Module comparison + print(" - Module comparison...") + visualizer.plot_module_complexity_comparison( + df, 'cyclomatic_complexity' + ) + + # Distribution analysis + print(" - Distribution analysis...") + visualizer.plot_distribution_analysis(df, 'cyclomatic_complexity') + visualizer.plot_distribution_analysis(df, 'fix_count', + 'fix_distribution.png') + + # Regression results + if regression_results: + print(" - Regression results...") + visualizer.plot_regression_results(regression_results) + + print(f" ✓ Saved visualizations to {FIGURES_DIR}/") + + # Print summary statistics + step_num = "7" if use_existing else "8" + print(f"\n{step_num}. Summary Statistics") + print("=" * 80) + print(f"\nTotal files analyzed: {len(df)}") + if repo_summaries: + print(f"Total repositories: {len(repo_summaries)}") + + print("\nComplexity Metrics (Mean ± Std):") + for metric in ['loc', 'cyclomatic_complexity', 'cognitive_complexity', + 'max_complexity', 'avg_complexity']: + if metric in df.columns: + mean_val = df[metric].mean() + std_val = df[metric].std() + print(f" {metric:30s}: {mean_val:8.2f} ± {std_val:8.2f}") + + print("\nFix Metrics (Mean ± Std):") + for metric in ['fix_count', 'total_fixes']: + if metric in df.columns: + mean_val = df[metric].mean() + std_val = df[metric].std() + print(f" {metric:30s}: {mean_val:8.2f} ± {std_val:8.2f}") + + if not FOCUSED_MODE: + print("\nSignificant Correlations:") + sig_corr = correlation_results.get('significant_correlations', {}) + if sig_corr: + for metric, corr_value in list(sig_corr.items())[:10]: + print(f" {metric}: {corr_value:.3f}") + else: + print(" None found") + + if regression_results: + print(f"\nRegression Analysis:") + print(f" R²: {regression_results.get('r_squared', 0):.3f}") + print(f" RMSE: {regression_results.get('rmse', 0):.3f}") + if regression_results.get('significant_features'): + print(f" Significant features: {regression_results['significant_features']}") + + # T-test results + if 't_test_results' in locals() and t_test_results: + print(f"\nT-Test Results (High-fix vs Low-fix files):") + for metric, result in t_test_results.items(): + if 'error' not in result: + sig_marker = "***" if result.get('significant') else "" + print(f" {metric}:") + print(f" t-statistic: {result.get('t_statistic', 0):.3f}") + print(f" p-value: {result.get('p_value', 1):.4f} {sig_marker}") + print(f" High-fix mean: {result.get('high_fix_mean', 0):.2f}") + print(f" Low-fix mean: {result.get('low_fix_mean', 0):.2f}") + + # Z-test results + if 'z_test_results' in locals() and z_test_results: + print(f"\nZ-Test Results (High-fix vs Low-fix files):") + for metric, result in z_test_results.items(): + if 'error' not in result: + sig_marker = "***" if result.get('significant') else "" + print(f" {metric}:") + print(f" z-statistic: {result.get('z_statistic', 0):.3f}") + print(f" p-value: {result.get('p_value', 1):.4f} {sig_marker}") + print(f" Mean difference: {result.get('mean_difference', 0):.2f}") + print(f" 95% CI: [{result.get('ci_lower', 0):.2f}, {result.get('ci_upper', 0):.2f}]") + + # Hypothesis testing results + if hypothesis_results: + print(f"\nHypothesis Testing (ANOVA/Kruskal-Wallis):") + for test_name, result in list(hypothesis_results.items())[:5]: + if isinstance(result, dict) and 'p_value' in result: + sig_marker = "***" if result.get('significant') else "" + print(f" {test_name}: p={result.get('p_value', 1):.4f} {sig_marker}") + + print("\n" + "=" * 80) + print("Analysis complete!") + print(f"Results saved to: {OUTPUT_DIR}/") + print(f"Figures saved to: {FIGURES_DIR}/") + print("=" * 80) + + # Cleanup (only if we collected data) + if not use_existing: + data_collector.cleanup() + + +if __name__ == '__main__': + main() + |