""" Code metrics analyzer for Python files. Analyzes LOC, complexity, and other metrics. """ import os import ast from typing import Dict, List, Optional from pathlib import Path from concurrent.futures import ProcessPoolExecutor, as_completed from functools import partial import radon.complexity as radon_complexity from radon.metrics import mi_visit from radon.raw import analyze class CodeAnalyzer: """Analyzer for Python code metrics.""" def __init__(self, exclude_dirs: Optional[List[str]] = None): self.exclude_dirs = exclude_dirs or [] def should_analyze(self, file_path: str) -> bool: """Check if a file should be analyzed.""" path = Path(file_path) # Check if in excluded directory for part in path.parts: if part in self.exclude_dirs: return False # Check if Python file return path.suffix == '.py' def analyze_file(self, file_path: str) -> Optional[Dict]: """Analyze a single Python file and return metrics.""" if not self.should_analyze(file_path): return None try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: code = f.read() if not code.strip(): return None metrics = {} # Lines of Code (LOC) raw_metrics = analyze(code) metrics['loc'] = raw_metrics.loc metrics['lloc'] = raw_metrics.lloc # Logical lines of code metrics['sloc'] = raw_metrics.sloc # Source lines of code metrics['comments'] = raw_metrics.comments metrics['blank_lines'] = raw_metrics.blank # Cyclomatic Complexity using Radon try: complexity_results = radon_complexity.cc_visit(code) total_complexity = sum(func.complexity for func in complexity_results) avg_complexity = (total_complexity / len(complexity_results) if complexity_results else 0) max_complexity = max((func.complexity for func in complexity_results), default=0) metrics['cyclomatic_complexity'] = total_complexity metrics['avg_complexity'] = avg_complexity metrics['max_complexity'] = max_complexity metrics['functions'] = len(complexity_results) except: metrics['cyclomatic_complexity'] = 0 metrics['avg_complexity'] = 0 metrics['max_complexity'] = 0 metrics['functions'] = 0 # Cognitive Complexity calculation # Note: Lizard doesn't provide cognitive complexity directly # We calculate it based on cyclomatic complexity + nesting depth penalty # Cognitive complexity penalizes nesting more heavily than cyclomatic complexity try: import lizard lizard_result = lizard.analyze_file(file_path) if lizard_result and lizard_result.function_list: total_cognitive = 0 for func in lizard_result.function_list: # Get cyclomatic complexity (base) base_cc = getattr(func, 'cyclomatic_complexity', 1) # Get nesting depth (cognitive complexity penalizes nesting) nesting_depth = getattr(func, 'max_nesting_depth', 0) # Cognitive complexity formula: CC + (nesting_depth * 2) # This approximates cognitive complexity by penalizing deep nesting cognitive = base_cc + (nesting_depth * 2) total_cognitive += max(0, cognitive) # Ensure non-negative metrics['cognitive_complexity'] = total_cognitive metrics['avg_cognitive_complexity'] = ( total_cognitive / len(lizard_result.function_list) if lizard_result.function_list else 0 ) else: # No functions found, set to 0 metrics['cognitive_complexity'] = 0 metrics['avg_cognitive_complexity'] = 0 except Exception as e: # If lizard fails, use cyclomatic complexity as fallback # This ensures we have some complexity metric even if lizard fails metrics['cognitive_complexity'] = metrics.get('cyclomatic_complexity', 0) metrics['avg_cognitive_complexity'] = metrics.get('avg_complexity', 0) # Maintainability Index try: metrics['maintainability_index'] = mi_visit(code, multi=True) except: metrics['maintainability_index'] = 0 # Depth of Inheritance (for classes) try: tree = ast.parse(code) max_depth = self._calculate_inheritance_depth(tree) metrics['max_inheritance_depth'] = max_depth metrics['classes'] = len([node for node in ast.walk(tree) if isinstance(node, ast.ClassDef)]) except: metrics['max_inheritance_depth'] = 0 metrics['classes'] = 0 # File path components for module analysis path_parts = Path(file_path).parts metrics['file_path'] = file_path metrics['module'] = path_parts[-2] if len(path_parts) > 1 else 'root' metrics['filename'] = path_parts[-1] return metrics except Exception as e: print(f"Error analyzing {file_path}: {e}") return None def _calculate_inheritance_depth(self, tree: ast.AST) -> int: """Calculate maximum inheritance depth in the AST.""" max_depth = 0 for node in ast.walk(tree): if isinstance(node, ast.ClassDef): depth = self._get_class_depth(node, tree) max_depth = max(max_depth, depth) return max_depth def _get_class_depth(self, class_node: ast.ClassDef, tree: ast.AST) -> int: """Get inheritance depth for a specific class.""" if not class_node.bases: return 1 max_base_depth = 0 for base in class_node.bases: if isinstance(base, ast.Name): # Find the base class definition for node in ast.walk(tree): if isinstance(node, ast.ClassDef) and node.name == base.id: base_depth = self._get_class_depth(node, tree) max_base_depth = max(max_base_depth, base_depth) break return max_base_depth + 1 def analyze_directory(self, directory: str, parallel: bool = True, max_workers: Optional[int] = None) -> List[Dict]: """Analyze all Python files in a directory recursively.""" directory_path = Path(directory) # Collect all Python files to analyze files_to_analyze = [ str(file_path) for file_path in directory_path.rglob('*.py') if self.should_analyze(str(file_path)) ] if not files_to_analyze: return [] if parallel and len(files_to_analyze) > 1: # Parallel analysis metrics_list = [] with ProcessPoolExecutor(max_workers=max_workers) as executor: # Create a partial function with the analyzer instance's method analyze_func = partial(_analyze_file_wrapper, exclude_dirs=self.exclude_dirs) futures = {executor.submit(analyze_func, file_path): file_path for file_path in files_to_analyze} for future in as_completed(futures): try: metrics = future.result() if metrics: metrics_list.append(metrics) except Exception as e: file_path = futures[future] print(f" Warning: Error analyzing {file_path}: {e}") return metrics_list else: # Sequential analysis metrics_list = [] for file_path in files_to_analyze: metrics = self.analyze_file(file_path) if metrics: metrics_list.append(metrics) return metrics_list def _analyze_file_wrapper(file_path: str, exclude_dirs: List[str]) -> Optional[Dict]: """Wrapper function for parallel file analysis.""" analyzer = CodeAnalyzer(exclude_dirs=exclude_dirs) return analyzer.analyze_file(file_path)