diff options
Diffstat (limited to 'statistical_analysis.py')
| -rw-r--r-- | statistical_analysis.py | 553 |
1 files changed, 553 insertions, 0 deletions
diff --git a/statistical_analysis.py b/statistical_analysis.py new file mode 100644 index 0000000..2a50f4e --- /dev/null +++ b/statistical_analysis.py @@ -0,0 +1,553 @@ +""" +Statistical analysis module for code metrics and issue data. +""" +import pandas as pd +import numpy as np +from scipy import stats +from scipy.stats import pearsonr, spearmanr, chi2_contingency +from sklearn.linear_model import LinearRegression +from sklearn.preprocessing import StandardScaler +from sklearn.feature_selection import VarianceThreshold +from typing import Dict, List, Tuple +import warnings +warnings.filterwarnings('ignore') + + +class StatisticalAnalyzer: + """Performs statistical analysis on code metrics data.""" + + def __init__(self, significance_level: float = 0.05, confidence_level: float = 0.95): + self.significance_level = significance_level + self.confidence_level = confidence_level + + def prepare_dataframe(self, data: List[Dict]) -> pd.DataFrame: + """Convert list of metrics dictionaries to DataFrame.""" + df = pd.DataFrame(data) + return df + + def correlation_analysis(self, df: pd.DataFrame) -> Dict: + """Perform correlation analysis between complexity metrics and issues.""" + results = {} + + # Select numeric columns for correlation + complexity_metrics = [ + 'loc', 'lloc', 'sloc', 'cyclomatic_complexity', + 'cognitive_complexity', 'max_complexity', 'avg_complexity', + 'max_inheritance_depth', 'maintainability_index' + ] + + issue_metrics = ['fix_count', 'total_fixes'] + + # Filter to columns that exist + complexity_cols = [col for col in complexity_metrics if col in df.columns] + issue_cols = [col for col in issue_metrics if col in df.columns] + + correlations = {} + p_values = {} + + for comp_col in complexity_cols: + for issue_col in issue_cols: + # Remove NaN values + mask = df[[comp_col, issue_col]].notna().all(axis=1) + if mask.sum() < 3: # Need at least 3 data points + continue + + x = df.loc[mask, comp_col] + y = df.loc[mask, issue_col] + + # Pearson correlation + pearson_r, pearson_p = pearsonr(x, y) + correlations[f'{comp_col}_vs_{issue_col}_pearson'] = pearson_r + p_values[f'{comp_col}_vs_{issue_col}_pearson'] = pearson_p + + # Spearman correlation (non-parametric) + spearman_r, spearman_p = spearmanr(x, y) + correlations[f'{comp_col}_vs_{issue_col}_spearman'] = spearman_r + p_values[f'{comp_col}_vs_{issue_col}_spearman'] = spearman_p + + results['correlations'] = correlations + results['p_values'] = p_values + results['significant_correlations'] = { + k: v for k, v in correlations.items() + if p_values.get(k.replace('_pearson', '_pearson').replace('_spearman', '_spearman'), 1) < self.significance_level + } + + return results + + def regression_analysis(self, df: pd.DataFrame, + complexity_features: List[str], + target: str = 'fix_count') -> Dict: + """Perform regression analysis to predict fix count from complexity.""" + results = {} + + # Prepare features + feature_cols = [col for col in complexity_features if col in df.columns] + if not feature_cols: + return results + + # Remove rows with missing values + mask = df[feature_cols + [target]].notna().all(axis=1) + if mask.sum() < len(feature_cols) + 1: + return results + + X = df.loc[mask, feature_cols] + y = df.loc[mask, target] + + # Check for multicollinearity - remove highly correlated features + if len(feature_cols) > 1: + corr_matrix = X.corr().abs() + upper_triangle = corr_matrix.where( + np.triu(np.ones(corr_matrix.shape), k=1).astype(bool) + ) + # Find features with correlation > 0.95 + high_corr_features = [column for column in upper_triangle.columns + if any(upper_triangle[column] > 0.95)] + if high_corr_features: + # Keep the first feature, remove others + features_to_remove = high_corr_features + feature_cols = [f for f in feature_cols if f not in features_to_remove] + X = X[feature_cols] + + if len(feature_cols) == 0: + return results + + # Standardize features + scaler = StandardScaler() + X_scaled = scaler.fit_transform(X) + + # Fit linear regression + model = LinearRegression() + model.fit(X_scaled, y) + + # Predictions + y_pred = model.predict(X_scaled) + + # Calculate metrics + r_squared = model.score(X_scaled, y) + mse = np.mean((y - y_pred) ** 2) + rmse = np.sqrt(mse) + + # Coefficients + coefficients = dict(zip(feature_cols, model.coef_)) + intercept = model.intercept_ + + # Confidence intervals for coefficients + n = len(y) + p = len(feature_cols) + residuals = y - y_pred + mse_residual = np.sum(residuals ** 2) / (n - p - 1) + + # Standard errors - handle singular matrix + X_with_intercept = np.column_stack([np.ones(n), X_scaled]) + XTX = X_with_intercept.T @ X_with_intercept + + try: + # Check if matrix is singular or near-singular + if np.linalg.cond(XTX) > 1e12: + # Use pseudo-inverse for near-singular matrices + cov_matrix = mse_residual * np.linalg.pinv(XTX) + else: + cov_matrix = mse_residual * np.linalg.inv(XTX) + std_errors = np.sqrt(np.diag(cov_matrix))[1:] # Skip intercept + + # t-statistics and p-values + t_stats = model.coef_ / std_errors + p_values = 2 * (1 - stats.t.cdf(np.abs(t_stats), n - p - 1)) + + # Confidence intervals + alpha = 1 - self.confidence_level + t_critical = stats.t.ppf(1 - alpha/2, n - p - 1) + ci_lower = model.coef_ - t_critical * std_errors + ci_upper = model.coef_ + t_critical * std_errors + except (np.linalg.LinAlgError, ValueError): + # If still singular, use pseudo-inverse + try: + cov_matrix = mse_residual * np.linalg.pinv(XTX) + std_errors = np.sqrt(np.diag(cov_matrix))[1:] + # Handle potential NaN values + std_errors = np.where(np.isnan(std_errors) | (std_errors == 0), + np.inf, std_errors) + t_stats = model.coef_ / std_errors + p_values = np.where(np.isfinite(t_stats), + 2 * (1 - stats.t.cdf(np.abs(t_stats), n - p - 1)), + np.nan) + alpha = 1 - self.confidence_level + t_critical = stats.t.ppf(1 - alpha/2, n - p - 1) + ci_lower = model.coef_ - t_critical * std_errors + ci_upper = model.coef_ + t_critical * std_errors + except: + # If all else fails, set defaults + std_errors = np.full(len(feature_cols), np.nan) + p_values = np.full(len(feature_cols), np.nan) + ci_lower = np.full(len(feature_cols), np.nan) + ci_upper = np.full(len(feature_cols), np.nan) + + results['r_squared'] = r_squared + results['rmse'] = rmse + results['coefficients'] = coefficients + results['intercept'] = intercept + results['p_values'] = dict(zip(feature_cols, p_values)) + results['confidence_intervals'] = { + col: (lower, upper) for col, lower, upper in + zip(feature_cols, ci_lower, ci_upper) + } + results['significant_features'] = [ + col for col, p_val in zip(feature_cols, p_values) + if p_val < self.significance_level + ] + + return results + + def hypothesis_testing(self, df: pd.DataFrame) -> Dict: + """Perform hypothesis tests comparing complexity across modules.""" + results = {} + + if 'module' not in df.columns: + return results + + # Test: Do different modules have significantly different complexity? + modules = df['module'].unique() + if len(modules) < 2: + return results + + complexity_metrics = [ + 'cyclomatic_complexity', 'cognitive_complexity', + 'avg_complexity', 'loc' + ] + + for metric in complexity_metrics: + if metric not in df.columns: + continue + + # Remove NaN values + data_by_module = [ + df[df['module'] == module][metric].dropna().values + for module in modules + if len(df[df['module'] == module][metric].dropna()) > 0 + ] + + if len(data_by_module) < 2: + continue + + # One-way ANOVA + try: + f_stat, p_value = stats.f_oneway(*data_by_module) + results[f'{metric}_anova'] = { + 'f_statistic': float(f_stat), + 'p_value': float(p_value), + 'significant': p_value < self.significance_level + } + except: + pass + + # Kruskal-Wallis (non-parametric alternative) + try: + h_stat, p_value_kw = stats.kruskal(*data_by_module) + results[f'{metric}_kruskal_wallis'] = { + 'h_statistic': float(h_stat), + 'p_value': float(p_value_kw), + 'significant': p_value_kw < self.significance_level + } + except: + pass + + return results + + def t_test_analysis(self, df: pd.DataFrame) -> Dict: + """ + Perform t-tests to compare complexity metrics between high-fix and low-fix files. + """ + results = {} + + if 'fix_count' not in df.columns: + return results + + # Split files into high-fix and low-fix groups + median_fixes = df['fix_count'].median() + high_fix_mask = df['fix_count'] > median_fixes + low_fix_mask = df['fix_count'] <= median_fixes + + complexity_metrics = [ + 'loc', 'cyclomatic_complexity', 'cognitive_complexity', + 'avg_complexity', 'max_complexity', 'max_inheritance_depth' + ] + + for metric in complexity_metrics: + if metric not in df.columns: + continue + + high_fix_data = df.loc[high_fix_mask, metric].dropna() + low_fix_data = df.loc[low_fix_mask, metric].dropna() + + if len(high_fix_data) < 2 or len(low_fix_data) < 2: + continue + + # Independent samples t-test (assuming unequal variances) + try: + t_stat, p_value = stats.ttest_ind(high_fix_data, low_fix_data, + equal_var=False) + results[f'{metric}_t_test'] = { + 't_statistic': float(t_stat), + 'p_value': float(p_value), + 'significant': p_value < self.significance_level, + 'high_fix_mean': float(high_fix_data.mean()), + 'low_fix_mean': float(low_fix_data.mean()), + 'high_fix_std': float(high_fix_data.std()), + 'low_fix_std': float(low_fix_data.std()), + 'high_fix_n': len(high_fix_data), + 'low_fix_n': len(low_fix_data) + } + except Exception as e: + results[f'{metric}_t_test'] = {'error': str(e)} + + return results + + def z_test_analysis(self, df: pd.DataFrame) -> Dict: + """ + Perform z-tests to compare complexity metrics between high-fix and low-fix files. + Z-test assumes known population variance (uses sample variance as approximation). + """ + results = {} + + if 'fix_count' not in df.columns: + return results + + # Split files into high-fix and low-fix groups + median_fixes = df['fix_count'].median() + high_fix_mask = df['fix_count'] > median_fixes + low_fix_mask = df['fix_count'] <= median_fixes + + complexity_metrics = [ + 'loc', 'cyclomatic_complexity', 'cognitive_complexity', + 'avg_complexity', 'max_complexity', 'max_inheritance_depth' + ] + + for metric in complexity_metrics: + if metric not in df.columns: + continue + + high_fix_data = df.loc[high_fix_mask, metric].dropna() + low_fix_data = df.loc[low_fix_mask, metric].dropna() + + if len(high_fix_data) < 30 or len(low_fix_data) < 30: + # Z-test requires large sample sizes (n >= 30) + continue + + try: + # Calculate means and standard errors + mean1 = high_fix_data.mean() + mean2 = low_fix_data.mean() + std1 = high_fix_data.std() + std2 = low_fix_data.std() + n1 = len(high_fix_data) + n2 = len(low_fix_data) + + # Standard error of the difference + se_diff = np.sqrt((std1**2 / n1) + (std2**2 / n2)) + + # Z-statistic + z_stat = (mean1 - mean2) / se_diff + + # Two-tailed p-value + p_value = 2 * (1 - stats.norm.cdf(abs(z_stat))) + + # Confidence interval for difference + alpha = 1 - self.confidence_level + z_critical = stats.norm.ppf(1 - alpha/2) + ci_lower = (mean1 - mean2) - z_critical * se_diff + ci_upper = (mean1 - mean2) + z_critical * se_diff + + results[f'{metric}_z_test'] = { + 'z_statistic': float(z_stat), + 'p_value': float(p_value), + 'significant': p_value < self.significance_level, + 'high_fix_mean': float(mean1), + 'low_fix_mean': float(mean2), + 'mean_difference': float(mean1 - mean2), + 'ci_lower': float(ci_lower), + 'ci_upper': float(ci_upper), + 'high_fix_n': n1, + 'low_fix_n': n2 + } + except Exception as e: + results[f'{metric}_z_test'] = {'error': str(e)} + + return results + + def confidence_intervals(self, df: pd.DataFrame, + metrics: List[str]) -> Dict: + """Calculate confidence intervals for various metrics.""" + results = {} + + alpha = 1 - self.confidence_level + + for metric in metrics: + if metric not in df.columns: + continue + + data = df[metric].dropna() + if len(data) < 2: + continue + + # Calculate mean and standard error + mean = data.mean() + std_err = stats.sem(data) + + # t-distribution confidence interval + t_critical = stats.t.ppf(1 - alpha/2, len(data) - 1) + ci_lower = mean - t_critical * std_err + ci_upper = mean + t_critical * std_err + + results[metric] = { + 'mean': mean, + 'std': data.std(), + 'ci_lower': ci_lower, + 'ci_upper': ci_upper, + 'confidence_level': self.confidence_level + } + + return results + + def variance_covariance_analysis(self, df: pd.DataFrame) -> Dict: + """Calculate variance-covariance matrix for complexity metrics.""" + results = {} + + complexity_metrics = [ + 'loc', 'cyclomatic_complexity', 'cognitive_complexity', + 'max_complexity', 'avg_complexity', 'max_inheritance_depth' + ] + + metric_cols = [col for col in complexity_metrics if col in df.columns] + + if len(metric_cols) < 2: + return results + + # Remove rows with missing values + data = df[metric_cols].dropna() + + if len(data) < 2: + return results + + # Calculate covariance matrix + cov_matrix = data.cov() + corr_matrix = data.corr() + + results['covariance_matrix'] = cov_matrix + results['correlation_matrix'] = corr_matrix + results['variances'] = data.var().to_dict() + + return results + + def pivot_table_analysis(self, df: pd.DataFrame) -> Dict: + """Create pivot tables for cross-tabulation analysis.""" + results = {} + + if 'module' not in df.columns: + return results + + # Create complexity categories + if 'cyclomatic_complexity' in df.columns: + df['complexity_category'] = pd.cut( + df['cyclomatic_complexity'], + bins=[0, 10, 25, 50, float('inf')], + labels=['Low', 'Medium', 'High', 'Very High'] + ) + + # Pivot: Module vs Complexity Category + pivot = pd.crosstab(df['module'], df['complexity_category'], + values=df['cyclomatic_complexity'], + aggfunc='mean') + results['module_complexity_pivot'] = pivot + + # Pivot: Module vs Fix Count + if 'fix_count' in df.columns: + pivot_fixes = pd.crosstab( + df['module'], + pd.cut(df['fix_count'], + bins=[0, 1, 5, 10, float('inf')], + labels=['None', 'Low', 'Medium', 'High']), + values=df['fix_count'], + aggfunc='mean' + ) + results['module_fixes_pivot'] = pivot_fixes + + return results + + def discrete_distribution_analysis(self, df: pd.DataFrame) -> Dict: + """Analyze discrete distributions of fix counts.""" + results = {} + + if 'fix_count' not in df.columns: + return results + + issue_counts = df['fix_count'].dropna() + + # Fit Poisson distribution + lambda_poisson = issue_counts.mean() + poisson_dist = stats.poisson(lambda_poisson) + + # Chi-square goodness of fit test + observed_freq = issue_counts.value_counts().sort_index() + max_observed = int(observed_freq.index.max()) + + # Create bins for chi-square test + # Use bins that ensure expected frequency >= 5 + observed_array = [] + expected_array = [] + + # Start from 0 and go up to max_observed + for k in range(max_observed + 1): + obs_count = observed_freq.get(k, 0) + exp_count = poisson_dist.pmf(k) * len(issue_counts) + + # Only include if expected frequency >= 5 + if exp_count >= 5: + observed_array.append(obs_count) + expected_array.append(exp_count) + + # If we have bins, perform the test + if len(observed_array) > 0 and len(expected_array) > 0: + # Normalize expected frequencies to match observed sum + observed_sum = sum(observed_array) + expected_sum = sum(expected_array) + + if expected_sum > 0: + # Scale expected frequencies to match observed sum + expected_array = np.array(expected_array) * (observed_sum / expected_sum) + observed_array = np.array(observed_array) + + # Ensure sums match (within tolerance) + if abs(sum(observed_array) - sum(expected_array)) < 1e-6: + try: + chi2_stat, p_value = stats.chisquare( + observed_array, + expected_array + ) + + results['poisson_fit'] = { + 'lambda': lambda_poisson, + 'chi2_statistic': float(chi2_stat), + 'p_value': float(p_value), + 'fits': p_value >= self.significance_level + } + except (ValueError, RuntimeError) as e: + # If chi-square test fails, skip it + results['poisson_fit'] = { + 'lambda': lambda_poisson, + 'chi2_statistic': None, + 'p_value': None, + 'fits': None, + 'error': str(e) + } + + # Summary statistics + results['distribution_summary'] = { + 'mean': issue_counts.mean(), + 'variance': issue_counts.var(), + 'std': issue_counts.std(), + 'skewness': stats.skew(issue_counts), + 'kurtosis': stats.kurtosis(issue_counts) + } + + return results + |