1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
|
"""
Code metrics analyzer for Python files.
Analyzes LOC, complexity, and other metrics.
"""
import os
import ast
from typing import Dict, List, Optional
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor, as_completed
from functools import partial
import radon.complexity as radon_complexity
from radon.metrics import mi_visit
from radon.raw import analyze
class CodeAnalyzer:
"""Analyzer for Python code metrics."""
def __init__(self, exclude_dirs: Optional[List[str]] = None):
self.exclude_dirs = exclude_dirs or []
def should_analyze(self, file_path: str) -> bool:
"""Check if a file should be analyzed."""
path = Path(file_path)
# Check if in excluded directory
for part in path.parts:
if part in self.exclude_dirs:
return False
# Check if Python file
return path.suffix == '.py'
def analyze_file(self, file_path: str) -> Optional[Dict]:
"""Analyze a single Python file and return metrics."""
if not self.should_analyze(file_path):
return None
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
code = f.read()
if not code.strip():
return None
metrics = {}
# Lines of Code (LOC)
raw_metrics = analyze(code)
metrics['loc'] = raw_metrics.loc
metrics['lloc'] = raw_metrics.lloc # Logical lines of code
metrics['sloc'] = raw_metrics.sloc # Source lines of code
metrics['comments'] = raw_metrics.comments
metrics['blank_lines'] = raw_metrics.blank
# Cyclomatic Complexity using Radon
try:
complexity_results = radon_complexity.cc_visit(code)
total_complexity = sum(func.complexity for func in complexity_results)
avg_complexity = (total_complexity / len(complexity_results)
if complexity_results else 0)
max_complexity = max((func.complexity for func in complexity_results),
default=0)
metrics['cyclomatic_complexity'] = total_complexity
metrics['avg_complexity'] = avg_complexity
metrics['max_complexity'] = max_complexity
metrics['functions'] = len(complexity_results)
except:
metrics['cyclomatic_complexity'] = 0
metrics['avg_complexity'] = 0
metrics['max_complexity'] = 0
metrics['functions'] = 0
# Cognitive Complexity calculation
# Note: Lizard doesn't provide cognitive complexity directly
# We calculate it based on cyclomatic complexity + nesting depth penalty
# Cognitive complexity penalizes nesting more heavily than cyclomatic complexity
try:
import lizard
lizard_result = lizard.analyze_file(file_path)
if lizard_result and lizard_result.function_list:
total_cognitive = 0
for func in lizard_result.function_list:
# Get cyclomatic complexity (base)
base_cc = getattr(func, 'cyclomatic_complexity', 1)
# Get nesting depth (cognitive complexity penalizes nesting)
nesting_depth = getattr(func, 'max_nesting_depth', 0)
# Cognitive complexity formula: CC + (nesting_depth * 2)
# This approximates cognitive complexity by penalizing deep nesting
cognitive = base_cc + (nesting_depth * 2)
total_cognitive += max(0, cognitive) # Ensure non-negative
metrics['cognitive_complexity'] = total_cognitive
metrics['avg_cognitive_complexity'] = (
total_cognitive / len(lizard_result.function_list)
if lizard_result.function_list else 0
)
else:
# No functions found, set to 0
metrics['cognitive_complexity'] = 0
metrics['avg_cognitive_complexity'] = 0
except Exception as e:
# If lizard fails, use cyclomatic complexity as fallback
# This ensures we have some complexity metric even if lizard fails
metrics['cognitive_complexity'] = metrics.get('cyclomatic_complexity', 0)
metrics['avg_cognitive_complexity'] = metrics.get('avg_complexity', 0)
# Maintainability Index
try:
metrics['maintainability_index'] = mi_visit(code, multi=True)
except:
metrics['maintainability_index'] = 0
# Depth of Inheritance (for classes)
try:
tree = ast.parse(code)
max_depth = self._calculate_inheritance_depth(tree)
metrics['max_inheritance_depth'] = max_depth
metrics['classes'] = len([node for node in ast.walk(tree)
if isinstance(node, ast.ClassDef)])
except:
metrics['max_inheritance_depth'] = 0
metrics['classes'] = 0
# File path components for module analysis
path_parts = Path(file_path).parts
metrics['file_path'] = file_path
metrics['module'] = path_parts[-2] if len(path_parts) > 1 else 'root'
metrics['filename'] = path_parts[-1]
return metrics
except Exception as e:
print(f"Error analyzing {file_path}: {e}")
return None
def _calculate_inheritance_depth(self, tree: ast.AST) -> int:
"""Calculate maximum inheritance depth in the AST."""
max_depth = 0
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
depth = self._get_class_depth(node, tree)
max_depth = max(max_depth, depth)
return max_depth
def _get_class_depth(self, class_node: ast.ClassDef, tree: ast.AST) -> int:
"""Get inheritance depth for a specific class."""
if not class_node.bases:
return 1
max_base_depth = 0
for base in class_node.bases:
if isinstance(base, ast.Name):
# Find the base class definition
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef) and node.name == base.id:
base_depth = self._get_class_depth(node, tree)
max_base_depth = max(max_base_depth, base_depth)
break
return max_base_depth + 1
def analyze_directory(self, directory: str, parallel: bool = True, max_workers: Optional[int] = None) -> List[Dict]:
"""Analyze all Python files in a directory recursively."""
directory_path = Path(directory)
# Collect all Python files to analyze
files_to_analyze = [
str(file_path) for file_path in directory_path.rglob('*.py')
if self.should_analyze(str(file_path))
]
if not files_to_analyze:
return []
if parallel and len(files_to_analyze) > 1:
# Parallel analysis
metrics_list = []
with ProcessPoolExecutor(max_workers=max_workers) as executor:
# Create a partial function with the analyzer instance's method
analyze_func = partial(_analyze_file_wrapper, exclude_dirs=self.exclude_dirs)
futures = {executor.submit(analyze_func, file_path): file_path
for file_path in files_to_analyze}
for future in as_completed(futures):
try:
metrics = future.result()
if metrics:
metrics_list.append(metrics)
except Exception as e:
file_path = futures[future]
print(f" Warning: Error analyzing {file_path}: {e}")
return metrics_list
else:
# Sequential analysis
metrics_list = []
for file_path in files_to_analyze:
metrics = self.analyze_file(file_path)
if metrics:
metrics_list.append(metrics)
return metrics_list
def _analyze_file_wrapper(file_path: str, exclude_dirs: List[str]) -> Optional[Dict]:
"""Wrapper function for parallel file analysis."""
analyzer = CodeAnalyzer(exclude_dirs=exclude_dirs)
return analyzer.analyze_file(file_path)
|