aboutsummaryrefslogtreecommitdiff
path: root/code_analyzer.py
diff options
context:
space:
mode:
authorFuwn <[email protected]>2025-12-09 23:16:23 -0800
committerFuwn <[email protected]>2025-12-09 23:16:23 -0800
commit3ffcdb247df3f56c4c21c6fed83ee1af5fb94224 (patch)
tree409fe42bb385ca73bd1b152623465ee098434179 /code_analyzer.py
downloadmathematicalstatisticsproject-3ffcdb247df3f56c4c21c6fed83ee1af5fb94224.tar.xz
mathematicalstatisticsproject-3ffcdb247df3f56c4c21c6fed83ee1af5fb94224.zip
feat: Initial commitHEADmain
Diffstat (limited to 'code_analyzer.py')
-rw-r--r--code_analyzer.py213
1 files changed, 213 insertions, 0 deletions
diff --git a/code_analyzer.py b/code_analyzer.py
new file mode 100644
index 0000000..5b0b313
--- /dev/null
+++ b/code_analyzer.py
@@ -0,0 +1,213 @@
+"""
+Code metrics analyzer for Python files.
+Analyzes LOC, complexity, and other metrics.
+"""
+import os
+import ast
+from typing import Dict, List, Optional
+from pathlib import Path
+from concurrent.futures import ProcessPoolExecutor, as_completed
+from functools import partial
+import radon.complexity as radon_complexity
+from radon.metrics import mi_visit
+from radon.raw import analyze
+
+
+class CodeAnalyzer:
+ """Analyzer for Python code metrics."""
+
+ def __init__(self, exclude_dirs: Optional[List[str]] = None):
+ self.exclude_dirs = exclude_dirs or []
+
+ def should_analyze(self, file_path: str) -> bool:
+ """Check if a file should be analyzed."""
+ path = Path(file_path)
+
+ # Check if in excluded directory
+ for part in path.parts:
+ if part in self.exclude_dirs:
+ return False
+
+ # Check if Python file
+ return path.suffix == '.py'
+
+ def analyze_file(self, file_path: str) -> Optional[Dict]:
+ """Analyze a single Python file and return metrics."""
+ if not self.should_analyze(file_path):
+ return None
+
+ try:
+ with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
+ code = f.read()
+
+ if not code.strip():
+ return None
+
+ metrics = {}
+
+ # Lines of Code (LOC)
+ raw_metrics = analyze(code)
+ metrics['loc'] = raw_metrics.loc
+ metrics['lloc'] = raw_metrics.lloc # Logical lines of code
+ metrics['sloc'] = raw_metrics.sloc # Source lines of code
+ metrics['comments'] = raw_metrics.comments
+ metrics['blank_lines'] = raw_metrics.blank
+
+ # Cyclomatic Complexity using Radon
+ try:
+ complexity_results = radon_complexity.cc_visit(code)
+ total_complexity = sum(func.complexity for func in complexity_results)
+ avg_complexity = (total_complexity / len(complexity_results)
+ if complexity_results else 0)
+ max_complexity = max((func.complexity for func in complexity_results),
+ default=0)
+
+ metrics['cyclomatic_complexity'] = total_complexity
+ metrics['avg_complexity'] = avg_complexity
+ metrics['max_complexity'] = max_complexity
+ metrics['functions'] = len(complexity_results)
+ except:
+ metrics['cyclomatic_complexity'] = 0
+ metrics['avg_complexity'] = 0
+ metrics['max_complexity'] = 0
+ metrics['functions'] = 0
+
+ # Cognitive Complexity calculation
+ # Note: Lizard doesn't provide cognitive complexity directly
+ # We calculate it based on cyclomatic complexity + nesting depth penalty
+ # Cognitive complexity penalizes nesting more heavily than cyclomatic complexity
+ try:
+ import lizard
+ lizard_result = lizard.analyze_file(file_path)
+
+ if lizard_result and lizard_result.function_list:
+ total_cognitive = 0
+ for func in lizard_result.function_list:
+ # Get cyclomatic complexity (base)
+ base_cc = getattr(func, 'cyclomatic_complexity', 1)
+ # Get nesting depth (cognitive complexity penalizes nesting)
+ nesting_depth = getattr(func, 'max_nesting_depth', 0)
+ # Cognitive complexity formula: CC + (nesting_depth * 2)
+ # This approximates cognitive complexity by penalizing deep nesting
+ cognitive = base_cc + (nesting_depth * 2)
+ total_cognitive += max(0, cognitive) # Ensure non-negative
+
+ metrics['cognitive_complexity'] = total_cognitive
+ metrics['avg_cognitive_complexity'] = (
+ total_cognitive / len(lizard_result.function_list)
+ if lizard_result.function_list else 0
+ )
+ else:
+ # No functions found, set to 0
+ metrics['cognitive_complexity'] = 0
+ metrics['avg_cognitive_complexity'] = 0
+ except Exception as e:
+ # If lizard fails, use cyclomatic complexity as fallback
+ # This ensures we have some complexity metric even if lizard fails
+ metrics['cognitive_complexity'] = metrics.get('cyclomatic_complexity', 0)
+ metrics['avg_cognitive_complexity'] = metrics.get('avg_complexity', 0)
+
+ # Maintainability Index
+ try:
+ metrics['maintainability_index'] = mi_visit(code, multi=True)
+ except:
+ metrics['maintainability_index'] = 0
+
+ # Depth of Inheritance (for classes)
+ try:
+ tree = ast.parse(code)
+ max_depth = self._calculate_inheritance_depth(tree)
+ metrics['max_inheritance_depth'] = max_depth
+ metrics['classes'] = len([node for node in ast.walk(tree)
+ if isinstance(node, ast.ClassDef)])
+ except:
+ metrics['max_inheritance_depth'] = 0
+ metrics['classes'] = 0
+
+ # File path components for module analysis
+ path_parts = Path(file_path).parts
+ metrics['file_path'] = file_path
+ metrics['module'] = path_parts[-2] if len(path_parts) > 1 else 'root'
+ metrics['filename'] = path_parts[-1]
+
+ return metrics
+
+ except Exception as e:
+ print(f"Error analyzing {file_path}: {e}")
+ return None
+
+ def _calculate_inheritance_depth(self, tree: ast.AST) -> int:
+ """Calculate maximum inheritance depth in the AST."""
+ max_depth = 0
+
+ for node in ast.walk(tree):
+ if isinstance(node, ast.ClassDef):
+ depth = self._get_class_depth(node, tree)
+ max_depth = max(max_depth, depth)
+
+ return max_depth
+
+ def _get_class_depth(self, class_node: ast.ClassDef, tree: ast.AST) -> int:
+ """Get inheritance depth for a specific class."""
+ if not class_node.bases:
+ return 1
+
+ max_base_depth = 0
+ for base in class_node.bases:
+ if isinstance(base, ast.Name):
+ # Find the base class definition
+ for node in ast.walk(tree):
+ if isinstance(node, ast.ClassDef) and node.name == base.id:
+ base_depth = self._get_class_depth(node, tree)
+ max_base_depth = max(max_base_depth, base_depth)
+ break
+
+ return max_base_depth + 1
+
+ def analyze_directory(self, directory: str, parallel: bool = True, max_workers: Optional[int] = None) -> List[Dict]:
+ """Analyze all Python files in a directory recursively."""
+ directory_path = Path(directory)
+
+ # Collect all Python files to analyze
+ files_to_analyze = [
+ str(file_path) for file_path in directory_path.rglob('*.py')
+ if self.should_analyze(str(file_path))
+ ]
+
+ if not files_to_analyze:
+ return []
+
+ if parallel and len(files_to_analyze) > 1:
+ # Parallel analysis
+ metrics_list = []
+ with ProcessPoolExecutor(max_workers=max_workers) as executor:
+ # Create a partial function with the analyzer instance's method
+ analyze_func = partial(_analyze_file_wrapper, exclude_dirs=self.exclude_dirs)
+ futures = {executor.submit(analyze_func, file_path): file_path
+ for file_path in files_to_analyze}
+
+ for future in as_completed(futures):
+ try:
+ metrics = future.result()
+ if metrics:
+ metrics_list.append(metrics)
+ except Exception as e:
+ file_path = futures[future]
+ print(f" Warning: Error analyzing {file_path}: {e}")
+
+ return metrics_list
+ else:
+ # Sequential analysis
+ metrics_list = []
+ for file_path in files_to_analyze:
+ metrics = self.analyze_file(file_path)
+ if metrics:
+ metrics_list.append(metrics)
+ return metrics_list
+
+
+def _analyze_file_wrapper(file_path: str, exclude_dirs: List[str]) -> Optional[Dict]:
+ """Wrapper function for parallel file analysis."""
+ analyzer = CodeAnalyzer(exclude_dirs=exclude_dirs)
+ return analyzer.analyze_file(file_path)
+