1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
|
"""
Data collection pipeline that combines code metrics with git commit log data.
"""
import os
import subprocess
import shutil
import tempfile
import re
from typing import List, Dict, Optional, Set
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from code_analyzer import CodeAnalyzer
from config import EXCLUDE_DIRS
class DataCollector:
"""Collects and combines code metrics with git commit data."""
def __init__(self, code_analyzer: CodeAnalyzer):
self.code_analyzer = code_analyzer
self.temp_dir = None
def clone_repository(self, owner: str, repo: str) -> Optional[str]:
"""Clone a repository to a temporary directory with full history."""
if not self.temp_dir:
self.temp_dir = tempfile.mkdtemp()
repo_url = f"https://github.com/{owner}/{repo}.git"
clone_path = os.path.join(self.temp_dir, repo)
try:
if os.path.exists(clone_path):
shutil.rmtree(clone_path)
# Clone with full history (needed for commit analysis)
subprocess.run(
['git', 'clone', repo_url, clone_path],
check=True,
capture_output=True,
timeout=600
)
return clone_path
except Exception as e:
print(f"Error cloning {owner}/{repo}: {e}")
return None
def get_fix_commits(self, repo_path: str) -> Dict[str, Set[str]]:
"""
Analyze git commit logs to find commits with 'fix' in the message
using semantic commit formats (fix:, fix(scope):, Fix:, etc.)
and track which files were changed in those commits.
Returns a dictionary mapping file paths to sets of commit hashes.
"""
file_fix_commits: Dict[str, Set[str]] = {}
try:
# Get all commits - we'll filter for semantic commit formats
# Look for patterns like: fix:, fix(scope):, Fix:, FIX:, fixes, fixed, etc.
result = subprocess.run(
['git', 'log', '--all', '--pretty=format:%H|%s'],
cwd=repo_path,
capture_output=True,
text=True,
timeout=300
)
if result.returncode != 0:
print(f" Warning: git log failed: {result.stderr}")
return file_fix_commits
# Parse commit hashes and check for semantic commit formats
fix_commit_hashes = []
for line in result.stdout.strip().split('\n'):
if '|' in line:
parts = line.split('|', 1)
commit_hash = parts[0].strip()
commit_msg = parts[1] if len(parts) > 1 else ''
commit_msg_lower = commit_msg.lower()
# Check for semantic commit formats:
# - fix: (conventional commits)
# - fix(scope): (conventional commits with scope)
# - Fix:, FIX: (case variations)
# - fixes #123, fix #123 (issue references)
# - fixed, fixing (verb forms)
# - bugfix, bug fix (variations)
is_fix_commit = (
commit_msg_lower.startswith('fix:') or
commit_msg_lower.startswith('fix(') or
commit_msg_lower.startswith('fixes') or
commit_msg_lower.startswith('fix ') or
commit_msg_lower.startswith('fixed') or
commit_msg_lower.startswith('fixing') or
'bugfix' in commit_msg_lower or
'bug fix' in commit_msg_lower or
(commit_msg.startswith('Fix:') and not commit_msg_lower.startswith('feature')) or
(commit_msg.startswith('FIX:') and not commit_msg_lower.startswith('feature'))
)
# Exclude false positives (like "prefix", "suffix", "affix", etc.)
if is_fix_commit and not any(word in commit_msg_lower for word in
['prefix', 'suffix', 'affix', 'transfix', 'crucifix']):
if len(commit_hash) == 40:
fix_commit_hashes.append(commit_hash)
print(f" Found {len(fix_commit_hashes)} fix commits")
# For each fix commit, get the files that were changed (parallelized)
def get_commit_files(commit_hash: str) -> List[str]:
"""Get Python files changed in a commit."""
try:
file_result = subprocess.run(
['git', 'show', '--name-only', '--pretty=format:', commit_hash],
cwd=repo_path,
capture_output=True,
text=True,
timeout=60
)
if file_result.returncode == 0:
return [f.strip() for f in file_result.stdout.strip().split('\n')
if f.strip() and f.strip().endswith('.py')]
except:
pass
return []
# Process commits in parallel
if fix_commit_hashes:
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(get_commit_files, commit_hash): commit_hash
for commit_hash in fix_commit_hashes}
for future in as_completed(futures):
commit_hash = futures[future]
try:
changed_files = future.result()
for file_path in changed_files:
if file_path not in file_fix_commits:
file_fix_commits[file_path] = set()
file_fix_commits[file_path].add(commit_hash)
except Exception as e:
print(f" Warning: Error processing commit {commit_hash[:8]}: {e}")
except Exception as e:
print(f" Warning: Error analyzing commits: {e}")
return file_fix_commits
def count_fixes_per_file(self, repo_path: str, code_metrics: List[Dict]) -> None:
"""
Count fix commits for each file in code_metrics.
Updates the metrics dictionaries in place.
"""
print(" Analyzing git commit logs for semantic fix commits...")
file_fix_commits = self.get_fix_commits(repo_path)
if not file_fix_commits:
print(" No fix commits found matching semantic commit formats")
# Set fix_count to 0 for all files
for metric in code_metrics:
metric['fix_count'] = 0
metric['total_fixes'] = 0
return
repo_base = Path(repo_path)
# Create a mapping from relative paths to fix counts
fix_counts: Dict[str, int] = {}
for file_path, commits in file_fix_commits.items():
# Try to normalize the path
try:
abs_path = Path(repo_path) / file_path
if abs_path.exists():
rel_path = abs_path.relative_to(repo_base)
fix_counts[str(rel_path)] = len(commits)
# Also store with forward slashes for matching
fix_counts[file_path] = len(commits)
except:
fix_counts[file_path] = len(commits)
print(f" Found {len(fix_counts)} files with fix commits")
# Match files to fix counts
for metric in tqdm(code_metrics, desc=" Matching files with fixes"):
relative_path = metric.get('relative_path', '')
filename = metric.get('filename', '')
file_path = metric.get('file_path', '')
fix_count = 0
# Try multiple matching strategies
if relative_path in fix_counts:
fix_count = fix_counts[relative_path]
elif filename in fix_counts:
fix_count = fix_counts[filename]
else:
# Try matching by filename in the fix_counts keys
for fix_file, count in fix_counts.items():
if filename in fix_file or relative_path in fix_file:
fix_count = max(fix_count, count)
metric['fix_count'] = fix_count
metric['total_fixes'] = fix_count # Alias for consistency
def collect_repository_data(self, owner: str, repo: str, parallel_files: bool = True, max_workers: Optional[int] = None) -> Optional[Dict]:
"""Collect all data for a repository."""
print(f"\nCollecting data for {owner}/{repo}...")
# Clone repository
repo_path = self.clone_repository(owner, repo)
if not repo_path:
return None
# Analyze code metrics (parallelized)
print(" Analyzing code metrics...")
code_metrics = self.code_analyzer.analyze_directory(repo_path, parallel=parallel_files, max_workers=max_workers)
if not code_metrics:
print(f" No Python files found in {owner}/{repo}")
return None
# Map file paths to relative paths for matching
repo_base = Path(repo_path)
for metric in code_metrics:
file_path = Path(metric['file_path'])
try:
relative_path = file_path.relative_to(repo_base)
metric['relative_path'] = str(relative_path).replace('\\', '/')
except:
metric['relative_path'] = metric['file_path']
# Count fix commits per file
self.count_fixes_per_file(repo_path, code_metrics)
# Get total fix commits count
total_fixes = sum(metric.get('fix_count', 0) for metric in code_metrics)
return {
'owner': owner,
'repo': repo,
'code_metrics': code_metrics,
'total_fixes': total_fixes
}
def cleanup(self):
"""Clean up temporary directories."""
if self.temp_dir and os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
self.temp_dir = None
|