aboutsummaryrefslogtreecommitdiff
path: root/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'main.py')
-rw-r--r--main.py438
1 files changed, 438 insertions, 0 deletions
diff --git a/main.py b/main.py
new file mode 100644
index 0000000..0440137
--- /dev/null
+++ b/main.py
@@ -0,0 +1,438 @@
+"""
+Main analysis script for code metrics analysis project.
+Orchestrates data collection, analysis, and visualization.
+"""
+import pandas as pd
+import numpy as np
+import json
+from pathlib import Path
+from typing import Optional, Dict
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from github_client import GitHubClient
+from code_analyzer import CodeAnalyzer
+from data_collector import DataCollector
+from statistical_analysis import StatisticalAnalyzer
+from visualizer import Visualizer
+from config import (
+ MAX_REPOSITORIES, MIN_STARS, EXCLUDE_DIRS,
+ OUTPUT_DIR, FIGURES_DIR, SIGNIFICANCE_LEVEL, CONFIDENCE_LEVEL,
+ CURATED_REPOSITORIES, MAX_WORKERS, PARALLEL_REPOS, PARALLEL_FILES,
+ USE_EXISTING_METRICS, RAW_METRICS_FILE, FOCUSED_MODE
+)
+
+
+def main():
+ """Main analysis pipeline."""
+ print("=" * 80)
+ print("Code Metrics Analysis Pipeline")
+ print("=" * 80)
+
+ # Create output directories
+ Path(OUTPUT_DIR).mkdir(exist_ok=True)
+ Path(FIGURES_DIR).mkdir(exist_ok=True)
+
+ # Initialize components
+ print("\n1. Initializing components...")
+ stat_analyzer = StatisticalAnalyzer(
+ significance_level=SIGNIFICANCE_LEVEL,
+ confidence_level=CONFIDENCE_LEVEL
+ )
+ visualizer = Visualizer(output_dir=FIGURES_DIR)
+
+ # Check if we should use existing metrics
+ raw_metrics_path = Path(RAW_METRICS_FILE)
+ use_existing = USE_EXISTING_METRICS and raw_metrics_path.exists()
+
+ if use_existing:
+ print(f"\n2. Loading existing raw metrics from {RAW_METRICS_FILE}...")
+ try:
+ df = pd.read_csv(raw_metrics_path)
+ print(f" ✓ Loaded {len(df)} file metrics from existing data")
+ all_metrics = df.to_dict('records')
+ repo_summaries = [] # We don't have repo summaries from CSV
+ except Exception as e:
+ print(f" ✗ Error loading existing metrics: {e}")
+ print(" Falling back to data collection...")
+ use_existing = False
+
+ if not use_existing:
+ # Initialize data collection components
+ github_client = GitHubClient()
+ code_analyzer = CodeAnalyzer(exclude_dirs=EXCLUDE_DIRS)
+ data_collector = DataCollector(code_analyzer)
+
+ # Use curated list of popular Python repositories
+ print("\n2. Using curated list of popular Python repositories...")
+ repositories_to_analyze = CURATED_REPOSITORIES[:MAX_REPOSITORIES]
+ print(f"Selected {len(repositories_to_analyze)} repositories:")
+ for owner, repo_name in repositories_to_analyze:
+ print(f" - {owner}/{repo_name}")
+
+ # Collect data from repositories (parallelized)
+ print("\n3. Collecting data from repositories...")
+ all_metrics = []
+ repo_summaries = []
+
+ def process_repository(owner: str, repo_name: str) -> Optional[Dict]:
+ """Process a single repository and return results."""
+ try:
+ print(f" Processing {owner}/{repo_name}...")
+
+ # Get repository info for stars count
+ try:
+ repo_info = github_client.get_repository_info(owner, repo_name)
+ stars = repo_info.get('stargazers_count', 0)
+ except:
+ stars = 0
+
+ repo_data = data_collector.collect_repository_data(
+ owner, repo_name,
+ parallel_files=PARALLEL_FILES,
+ max_workers=MAX_WORKERS
+ )
+
+ if repo_data and repo_data['code_metrics']:
+ print(f" ✓ {owner}/{repo_name}: {len(repo_data['code_metrics'])} files, {repo_data['total_fixes']} fixes")
+ return {
+ 'metrics': repo_data['code_metrics'],
+ 'summary': {
+ 'owner': owner,
+ 'repo': repo_name,
+ 'files_analyzed': len(repo_data['code_metrics']),
+ 'total_fixes': repo_data['total_fixes'],
+ 'stars': stars
+ }
+ }
+ else:
+ print(f" ✗ {owner}/{repo_name}: No data collected")
+ return None
+ except Exception as e:
+ print(f" ✗ {owner}/{repo_name}: Error - {e}")
+ import traceback
+ traceback.print_exc()
+ return None
+
+ if PARALLEL_REPOS and len(repositories_to_analyze) > 1:
+ # Process repositories in parallel
+ print(f" Processing {len(repositories_to_analyze)} repositories in parallel...")
+ with ThreadPoolExecutor(max_workers=min(len(repositories_to_analyze), MAX_WORKERS or 4)) as executor:
+ futures = {executor.submit(process_repository, owner, repo_name): (owner, repo_name)
+ for owner, repo_name in repositories_to_analyze}
+
+ for future in as_completed(futures):
+ owner, repo_name = futures[future]
+ try:
+ result = future.result()
+ if result:
+ all_metrics.extend(result['metrics'])
+ repo_summaries.append(result['summary'])
+ except Exception as e:
+ print(f" ✗ {owner}/{repo_name}: Failed - {e}")
+ else:
+ # Sequential processing
+ for i, (owner, repo_name) in enumerate(repositories_to_analyze, 1):
+ print(f"\n[{i}/{len(repositories_to_analyze)}] Processing {owner}/{repo_name}...")
+ result = process_repository(owner, repo_name)
+ if result:
+ all_metrics.extend(result['metrics'])
+ repo_summaries.append(result['summary'])
+
+ if not all_metrics:
+ print("\nNo metrics collected. Exiting.")
+ return
+
+ # Convert to DataFrame
+ print("\n4. Preparing data for analysis...")
+ df = stat_analyzer.prepare_dataframe(all_metrics)
+ print(f"Total files analyzed: {len(df)}")
+ print(f"Columns: {list(df.columns)}")
+
+ # Save raw data
+ df.to_csv(Path(OUTPUT_DIR) / 'raw_metrics.csv', index=False)
+ print(f" ✓ Saved raw metrics to {OUTPUT_DIR}/raw_metrics.csv")
+ else:
+ # Already have DataFrame from CSV
+ print("\n3. Preparing data for analysis...")
+ df = stat_analyzer.prepare_dataframe(all_metrics)
+ print(f"Total files analyzed: {len(df)}")
+ print(f"Columns: {list(df.columns)}")
+
+ if len(df) == 0:
+ print("\nNo metrics available for analysis. Exiting.")
+ return
+
+ # Statistical Analysis
+ step_num = "4" if use_existing else "5"
+ print(f"\n{step_num}. Performing statistical analysis...")
+
+ if FOCUSED_MODE:
+ print(" [FOCUSED MODE: Regression and Hypothesis Testing Only]")
+
+ # Regression analysis
+ print(" - Regression analysis...")
+ complexity_features = [
+ 'loc', 'cyclomatic_complexity', 'cognitive_complexity',
+ 'max_complexity', 'avg_complexity', 'max_inheritance_depth'
+ ]
+ regression_results = stat_analyzer.regression_analysis(
+ df, complexity_features, target='fix_count'
+ )
+
+ # Hypothesis testing - t-tests
+ print(" - T-test analysis...")
+ t_test_results = stat_analyzer.t_test_analysis(df)
+
+ # Hypothesis testing - z-tests
+ print(" - Z-test analysis...")
+ z_test_results = stat_analyzer.z_test_analysis(df)
+
+ # Hypothesis testing - module comparisons (ANOVA)
+ print(" - Hypothesis testing (ANOVA)...")
+ hypothesis_results = stat_analyzer.hypothesis_testing(df)
+
+ # Confidence intervals for key metrics
+ print(" - Confidence intervals...")
+ ci_results = stat_analyzer.confidence_intervals(
+ df, ['cyclomatic_complexity', 'cognitive_complexity', 'loc', 'fix_count']
+ )
+
+ # Initialize empty results for skipped analyses
+ correlation_results = {}
+ var_cov_results = {}
+ pivot_results = {}
+ distribution_results = {}
+ else:
+ # Full analysis mode
+ # Correlation analysis
+ print(" - Correlation analysis...")
+ correlation_results = stat_analyzer.correlation_analysis(df)
+
+ # Regression analysis
+ print(" - Regression analysis...")
+ complexity_features = [
+ 'loc', 'cyclomatic_complexity', 'cognitive_complexity',
+ 'max_complexity', 'avg_complexity', 'max_inheritance_depth'
+ ]
+ regression_results = stat_analyzer.regression_analysis(
+ df, complexity_features, target='fix_count'
+ )
+
+ # Hypothesis testing
+ print(" - Hypothesis testing...")
+ hypothesis_results = stat_analyzer.hypothesis_testing(df)
+
+ # T-test analysis
+ print(" - T-test analysis...")
+ t_test_results = stat_analyzer.t_test_analysis(df)
+
+ # Z-test analysis
+ print(" - Z-test analysis...")
+ z_test_results = stat_analyzer.z_test_analysis(df)
+
+ # Confidence intervals
+ print(" - Confidence intervals...")
+ ci_results = stat_analyzer.confidence_intervals(
+ df, ['cyclomatic_complexity', 'cognitive_complexity', 'loc', 'fix_count']
+ )
+
+ # Variance-covariance analysis
+ print(" - Variance-covariance analysis...")
+ var_cov_results = stat_analyzer.variance_covariance_analysis(df)
+
+ # Pivot table analysis
+ print(" - Pivot table analysis...")
+ pivot_results = stat_analyzer.pivot_table_analysis(df)
+
+ # Discrete distribution analysis
+ print(" - Discrete distribution analysis...")
+ distribution_results = stat_analyzer.discrete_distribution_analysis(df)
+
+ # Save analysis results
+ step_num = "5" if use_existing else "6"
+ print(f"\n{step_num}. Saving analysis results...")
+ results = {
+ 'correlation_analysis': correlation_results,
+ 'regression_analysis': regression_results,
+ 'hypothesis_testing': hypothesis_results,
+ 't_test_analysis': t_test_results if 't_test_results' in locals() else {},
+ 'z_test_analysis': z_test_results if 'z_test_results' in locals() else {},
+ 'confidence_intervals': ci_results,
+ 'variance_covariance': var_cov_results,
+ 'pivot_tables': {k: v.to_dict() if isinstance(v, pd.DataFrame) else v
+ for k, v in pivot_results.items()},
+ 'distribution_analysis': distribution_results,
+ 'repository_summaries': repo_summaries,
+ 'analysis_mode': 'focused' if FOCUSED_MODE else 'full'
+ }
+
+ # Convert numpy types to native Python types for JSON serialization
+ def convert_to_serializable(obj):
+ if isinstance(obj, (np.integer, np.floating)):
+ return float(obj)
+ elif isinstance(obj, np.ndarray):
+ return obj.tolist()
+ elif isinstance(obj, dict):
+ return {k: convert_to_serializable(v) for k, v in obj.items()}
+ elif isinstance(obj, list):
+ return [convert_to_serializable(item) for item in obj]
+ elif isinstance(obj, pd.DataFrame):
+ return obj.to_dict()
+ return obj
+
+ results_serializable = convert_to_serializable(results)
+
+ with open(Path(OUTPUT_DIR) / 'analysis_results.json', 'w') as f:
+ json.dump(results_serializable, f, indent=2, default=str)
+ print(f" ✓ Saved analysis results to {OUTPUT_DIR}/analysis_results.json")
+
+ # Create visualizations
+ print("\n" + ("6" if use_existing else "7") + ". Creating visualizations...")
+
+ if FOCUSED_MODE:
+ print(" [FOCUSED MODE: Regression visualizations only]")
+
+ # Scatter plots for key relationships
+ print(" - Scatter plots...")
+ visualizer.plot_complexity_vs_issues_scatter(
+ df, 'loc', 'fix_count',
+ 'loc_vs_fixes.png'
+ )
+ visualizer.plot_complexity_vs_issues_scatter(
+ df, 'cognitive_complexity', 'fix_count',
+ 'cognitive_complexity_vs_fixes.png'
+ )
+
+ # Regression results
+ if regression_results:
+ print(" - Regression results...")
+ visualizer.plot_regression_results(regression_results)
+ else:
+ # Full visualization mode
+ # Correlation heatmap
+ print(" - Correlation heatmap...")
+ visualizer.plot_correlation_heatmap(
+ df,
+ complexity_cols=['loc', 'cyclomatic_complexity', 'cognitive_complexity',
+ 'max_complexity', 'avg_complexity'],
+ issue_cols=['fix_count', 'total_fixes']
+ )
+
+ # Scatter plots
+ print(" - Scatter plots...")
+ visualizer.plot_complexity_vs_issues_scatter(
+ df, 'cyclomatic_complexity', 'fix_count',
+ 'cyclomatic_complexity_vs_fixes.png'
+ )
+ visualizer.plot_complexity_vs_issues_scatter(
+ df, 'cognitive_complexity', 'fix_count',
+ 'cognitive_complexity_vs_fixes.png'
+ )
+ visualizer.plot_complexity_vs_issues_scatter(
+ df, 'loc', 'fix_count',
+ 'loc_vs_fixes.png'
+ )
+
+ # Module comparison
+ print(" - Module comparison...")
+ visualizer.plot_module_complexity_comparison(
+ df, 'cyclomatic_complexity'
+ )
+
+ # Distribution analysis
+ print(" - Distribution analysis...")
+ visualizer.plot_distribution_analysis(df, 'cyclomatic_complexity')
+ visualizer.plot_distribution_analysis(df, 'fix_count',
+ 'fix_distribution.png')
+
+ # Regression results
+ if regression_results:
+ print(" - Regression results...")
+ visualizer.plot_regression_results(regression_results)
+
+ print(f" ✓ Saved visualizations to {FIGURES_DIR}/")
+
+ # Print summary statistics
+ step_num = "7" if use_existing else "8"
+ print(f"\n{step_num}. Summary Statistics")
+ print("=" * 80)
+ print(f"\nTotal files analyzed: {len(df)}")
+ if repo_summaries:
+ print(f"Total repositories: {len(repo_summaries)}")
+
+ print("\nComplexity Metrics (Mean ± Std):")
+ for metric in ['loc', 'cyclomatic_complexity', 'cognitive_complexity',
+ 'max_complexity', 'avg_complexity']:
+ if metric in df.columns:
+ mean_val = df[metric].mean()
+ std_val = df[metric].std()
+ print(f" {metric:30s}: {mean_val:8.2f} ± {std_val:8.2f}")
+
+ print("\nFix Metrics (Mean ± Std):")
+ for metric in ['fix_count', 'total_fixes']:
+ if metric in df.columns:
+ mean_val = df[metric].mean()
+ std_val = df[metric].std()
+ print(f" {metric:30s}: {mean_val:8.2f} ± {std_val:8.2f}")
+
+ if not FOCUSED_MODE:
+ print("\nSignificant Correlations:")
+ sig_corr = correlation_results.get('significant_correlations', {})
+ if sig_corr:
+ for metric, corr_value in list(sig_corr.items())[:10]:
+ print(f" {metric}: {corr_value:.3f}")
+ else:
+ print(" None found")
+
+ if regression_results:
+ print(f"\nRegression Analysis:")
+ print(f" R²: {regression_results.get('r_squared', 0):.3f}")
+ print(f" RMSE: {regression_results.get('rmse', 0):.3f}")
+ if regression_results.get('significant_features'):
+ print(f" Significant features: {regression_results['significant_features']}")
+
+ # T-test results
+ if 't_test_results' in locals() and t_test_results:
+ print(f"\nT-Test Results (High-fix vs Low-fix files):")
+ for metric, result in t_test_results.items():
+ if 'error' not in result:
+ sig_marker = "***" if result.get('significant') else ""
+ print(f" {metric}:")
+ print(f" t-statistic: {result.get('t_statistic', 0):.3f}")
+ print(f" p-value: {result.get('p_value', 1):.4f} {sig_marker}")
+ print(f" High-fix mean: {result.get('high_fix_mean', 0):.2f}")
+ print(f" Low-fix mean: {result.get('low_fix_mean', 0):.2f}")
+
+ # Z-test results
+ if 'z_test_results' in locals() and z_test_results:
+ print(f"\nZ-Test Results (High-fix vs Low-fix files):")
+ for metric, result in z_test_results.items():
+ if 'error' not in result:
+ sig_marker = "***" if result.get('significant') else ""
+ print(f" {metric}:")
+ print(f" z-statistic: {result.get('z_statistic', 0):.3f}")
+ print(f" p-value: {result.get('p_value', 1):.4f} {sig_marker}")
+ print(f" Mean difference: {result.get('mean_difference', 0):.2f}")
+ print(f" 95% CI: [{result.get('ci_lower', 0):.2f}, {result.get('ci_upper', 0):.2f}]")
+
+ # Hypothesis testing results
+ if hypothesis_results:
+ print(f"\nHypothesis Testing (ANOVA/Kruskal-Wallis):")
+ for test_name, result in list(hypothesis_results.items())[:5]:
+ if isinstance(result, dict) and 'p_value' in result:
+ sig_marker = "***" if result.get('significant') else ""
+ print(f" {test_name}: p={result.get('p_value', 1):.4f} {sig_marker}")
+
+ print("\n" + "=" * 80)
+ print("Analysis complete!")
+ print(f"Results saved to: {OUTPUT_DIR}/")
+ print(f"Figures saved to: {FIGURES_DIR}/")
+ print("=" * 80)
+
+ # Cleanup (only if we collected data)
+ if not use_existing:
+ data_collector.cleanup()
+
+
+if __name__ == '__main__':
+ main()
+