From 3ffcdb247df3f56c4c21c6fed83ee1af5fb94224 Mon Sep 17 00:00:00 2001 From: Fuwn Date: Tue, 9 Dec 2025 23:16:23 -0800 Subject: feat: Initial commit --- code_analyzer.py | 213 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 213 insertions(+) create mode 100644 code_analyzer.py (limited to 'code_analyzer.py') diff --git a/code_analyzer.py b/code_analyzer.py new file mode 100644 index 0000000..5b0b313 --- /dev/null +++ b/code_analyzer.py @@ -0,0 +1,213 @@ +""" +Code metrics analyzer for Python files. +Analyzes LOC, complexity, and other metrics. +""" +import os +import ast +from typing import Dict, List, Optional +from pathlib import Path +from concurrent.futures import ProcessPoolExecutor, as_completed +from functools import partial +import radon.complexity as radon_complexity +from radon.metrics import mi_visit +from radon.raw import analyze + + +class CodeAnalyzer: + """Analyzer for Python code metrics.""" + + def __init__(self, exclude_dirs: Optional[List[str]] = None): + self.exclude_dirs = exclude_dirs or [] + + def should_analyze(self, file_path: str) -> bool: + """Check if a file should be analyzed.""" + path = Path(file_path) + + # Check if in excluded directory + for part in path.parts: + if part in self.exclude_dirs: + return False + + # Check if Python file + return path.suffix == '.py' + + def analyze_file(self, file_path: str) -> Optional[Dict]: + """Analyze a single Python file and return metrics.""" + if not self.should_analyze(file_path): + return None + + try: + with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: + code = f.read() + + if not code.strip(): + return None + + metrics = {} + + # Lines of Code (LOC) + raw_metrics = analyze(code) + metrics['loc'] = raw_metrics.loc + metrics['lloc'] = raw_metrics.lloc # Logical lines of code + metrics['sloc'] = raw_metrics.sloc # Source lines of code + metrics['comments'] = raw_metrics.comments + metrics['blank_lines'] = raw_metrics.blank + + # Cyclomatic Complexity using Radon + try: + complexity_results = radon_complexity.cc_visit(code) + total_complexity = sum(func.complexity for func in complexity_results) + avg_complexity = (total_complexity / len(complexity_results) + if complexity_results else 0) + max_complexity = max((func.complexity for func in complexity_results), + default=0) + + metrics['cyclomatic_complexity'] = total_complexity + metrics['avg_complexity'] = avg_complexity + metrics['max_complexity'] = max_complexity + metrics['functions'] = len(complexity_results) + except: + metrics['cyclomatic_complexity'] = 0 + metrics['avg_complexity'] = 0 + metrics['max_complexity'] = 0 + metrics['functions'] = 0 + + # Cognitive Complexity calculation + # Note: Lizard doesn't provide cognitive complexity directly + # We calculate it based on cyclomatic complexity + nesting depth penalty + # Cognitive complexity penalizes nesting more heavily than cyclomatic complexity + try: + import lizard + lizard_result = lizard.analyze_file(file_path) + + if lizard_result and lizard_result.function_list: + total_cognitive = 0 + for func in lizard_result.function_list: + # Get cyclomatic complexity (base) + base_cc = getattr(func, 'cyclomatic_complexity', 1) + # Get nesting depth (cognitive complexity penalizes nesting) + nesting_depth = getattr(func, 'max_nesting_depth', 0) + # Cognitive complexity formula: CC + (nesting_depth * 2) + # This approximates cognitive complexity by penalizing deep nesting + cognitive = base_cc + (nesting_depth * 2) + total_cognitive += max(0, cognitive) # Ensure non-negative + + metrics['cognitive_complexity'] = total_cognitive + metrics['avg_cognitive_complexity'] = ( + total_cognitive / len(lizard_result.function_list) + if lizard_result.function_list else 0 + ) + else: + # No functions found, set to 0 + metrics['cognitive_complexity'] = 0 + metrics['avg_cognitive_complexity'] = 0 + except Exception as e: + # If lizard fails, use cyclomatic complexity as fallback + # This ensures we have some complexity metric even if lizard fails + metrics['cognitive_complexity'] = metrics.get('cyclomatic_complexity', 0) + metrics['avg_cognitive_complexity'] = metrics.get('avg_complexity', 0) + + # Maintainability Index + try: + metrics['maintainability_index'] = mi_visit(code, multi=True) + except: + metrics['maintainability_index'] = 0 + + # Depth of Inheritance (for classes) + try: + tree = ast.parse(code) + max_depth = self._calculate_inheritance_depth(tree) + metrics['max_inheritance_depth'] = max_depth + metrics['classes'] = len([node for node in ast.walk(tree) + if isinstance(node, ast.ClassDef)]) + except: + metrics['max_inheritance_depth'] = 0 + metrics['classes'] = 0 + + # File path components for module analysis + path_parts = Path(file_path).parts + metrics['file_path'] = file_path + metrics['module'] = path_parts[-2] if len(path_parts) > 1 else 'root' + metrics['filename'] = path_parts[-1] + + return metrics + + except Exception as e: + print(f"Error analyzing {file_path}: {e}") + return None + + def _calculate_inheritance_depth(self, tree: ast.AST) -> int: + """Calculate maximum inheritance depth in the AST.""" + max_depth = 0 + + for node in ast.walk(tree): + if isinstance(node, ast.ClassDef): + depth = self._get_class_depth(node, tree) + max_depth = max(max_depth, depth) + + return max_depth + + def _get_class_depth(self, class_node: ast.ClassDef, tree: ast.AST) -> int: + """Get inheritance depth for a specific class.""" + if not class_node.bases: + return 1 + + max_base_depth = 0 + for base in class_node.bases: + if isinstance(base, ast.Name): + # Find the base class definition + for node in ast.walk(tree): + if isinstance(node, ast.ClassDef) and node.name == base.id: + base_depth = self._get_class_depth(node, tree) + max_base_depth = max(max_base_depth, base_depth) + break + + return max_base_depth + 1 + + def analyze_directory(self, directory: str, parallel: bool = True, max_workers: Optional[int] = None) -> List[Dict]: + """Analyze all Python files in a directory recursively.""" + directory_path = Path(directory) + + # Collect all Python files to analyze + files_to_analyze = [ + str(file_path) for file_path in directory_path.rglob('*.py') + if self.should_analyze(str(file_path)) + ] + + if not files_to_analyze: + return [] + + if parallel and len(files_to_analyze) > 1: + # Parallel analysis + metrics_list = [] + with ProcessPoolExecutor(max_workers=max_workers) as executor: + # Create a partial function with the analyzer instance's method + analyze_func = partial(_analyze_file_wrapper, exclude_dirs=self.exclude_dirs) + futures = {executor.submit(analyze_func, file_path): file_path + for file_path in files_to_analyze} + + for future in as_completed(futures): + try: + metrics = future.result() + if metrics: + metrics_list.append(metrics) + except Exception as e: + file_path = futures[future] + print(f" Warning: Error analyzing {file_path}: {e}") + + return metrics_list + else: + # Sequential analysis + metrics_list = [] + for file_path in files_to_analyze: + metrics = self.analyze_file(file_path) + if metrics: + metrics_list.append(metrics) + return metrics_list + + +def _analyze_file_wrapper(file_path: str, exclude_dirs: List[str]) -> Optional[Dict]: + """Wrapper function for parallel file analysis.""" + analyzer = CodeAnalyzer(exclude_dirs=exclude_dirs) + return analyzer.analyze_file(file_path) + -- cgit v1.2.3