-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathcode_validator.py
More file actions
546 lines (496 loc) · 20.7 KB
/
code_validator.py
File metadata and controls
546 lines (496 loc) · 20.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
# src/utils/code_validator.py
import ast
import hashlib
import json # Added for Bandit output parsing
import logging
import subprocess
import sys
import tempfile
from pathlib import Path
from typing import Any
import pycodestyle
logger = logging.getLogger(__name__)
class CodeValidationError(Exception):
"""Custom exception for code validation errors."""
pass
def _run_pycodestyle(content: str, filename: str) -> list[dict[str, Any]]:
"""Runs pycodestyle on the given content using its library API."""
issues = []
try:
pycodestyle.StyleGuide(quiet=True, format="default")
checker = pycodestyle.Checker(
filename=filename, lines=content.splitlines(keepends=True)
)
errors = checker.check_all()
for line_num, col_num, code, message in errors:
issues.append(
{
"line_number": line_num,
"column_number": col_num,
"code": code,
"message": message.strip(),
"source": "pycodestyle",
"filename": filename,
"type": "PEP8 Violation",
}
)
except Exception as e:
logger.error(f"Error running pycodestyle on {filename}: {e}")
issues.append(
{
"line_number": None,
"column_number": None,
"code": "PYCODESTYLE_ERROR",
"message": f"Internal error during pycodestyle check: {e}",
"source": "pycodestyle",
"filename": filename,
"type": "Validation Tool Error",
}
)
return issues
def _run_bandit(content: str, filename: str) -> list[dict[str, Any]]:
"""Runs Bandit security analysis on the given content via subprocess."""
issues = []
try:
with tempfile.NamedTemporaryFile(
mode="w+", suffix=".py", encoding="utf-8", delete=True
) as temp_file:
temp_file.write(content)
temp_file.flush()
command = [
sys.executable,
"-m",
"bandit",
"-q",
"-f",
"json",
temp_file.name,
]
process = subprocess.run(
command,
capture_output=True,
text=True,
check=False,
shell=False,
timeout=30,
)
if process.returncode not in (0, 1):
logger.error(
f"Bandit execution failed for {filename} with return code {process.returncode}. Stderr: {process.stderr}"
)
issues.append(
{
"type": "Validation Tool Error",
"file": filename,
"message": f"Bandit failed: {process.stderr}",
}
)
else:
try:
bandit_results = process.stdout.strip()
if bandit_results:
data = json.loads(bandit_results)
for issue in data.get("results", []):
if issue["level"] != "info":
issues.append(
{
"type": "Bandit Security Issue",
"file": filename,
"line": issue.get("line_number"),
"code": issue.get("test_id"),
"message": f"[{issue.get('severity')}] {issue.get('description')}",
}
)
except json.JSONDecodeError as jde:
logger.error(
f"Failed to parse Bandit JSON output for {filename}: {jde}. Output: {process.stdout}"
)
issues.append(
{
"type": "Validation Tool Error",
"file": filename,
"message": f"Failed to parse Bandit output: {jde}",
}
)
except Exception as e:
logger.error(
f"Unexpected error processing Bandit output for {filename}: {e}"
)
issues.append(
{
"type": "Validation Tool Error",
"file": filename,
"message": f"Error processing Bandit output: {e}",
}
)
except FileNotFoundError:
logger.error(
"Bandit command not found. Ensure Bandit is installed and in the PATH."
)
issues.append(
{
"type": "Validation Tool Error",
"file": filename,
"message": "Bandit executable not found. Please install Bandit.",
}
)
except subprocess.TimeoutExpired:
logger.error(f"Bandit execution timed out for {filename}.")
issues.append(
{
"type": "Validation Tool Error",
"file": filename,
"message": "Bandit execution timed out.",
}
)
except Exception as e:
logger.error(f"Unexpected error running Bandit on {filename}: {e}")
issues.append(
{
"type": "Validation Tool Error",
"file": filename,
"message": f"Failed to run Bandit: {e}",
}
)
return issues
# --- NEW FUNCTION FOR IMPROVEMENT 2.2 (AST Security Checks) ---
def _run_ast_security_checks(content: str, filename: str) -> list[dict[str, Any]]:
"""Runs AST-based security checks on Python code."""
issues = []
try:
tree = ast.parse(content)
class SecurityPatternVisitor(ast.NodeVisitor):
def __init__(self, filename):
self.filename = filename
self.issues = []
def visit_Call(self, node):
# Check for eval() and exec()
if isinstance(node.func, ast.Name):
if node.func.id == "eval":
self.issues.append(
{
"type": "Security Vulnerability (AST)",
"file": self.filename,
"line": node.lineno,
"message": "Use of eval() is discouraged due to security risks.",
}
)
elif node.func.id == "exec":
self.issues.append(
{
"type": "Security Vulnerability (AST)",
"file": self.filename,
"line": node.lineno,
"message": "Use of exec() is discouraged due to security risks.",
}
)
# Check for subprocess.run with shell=True
if (
isinstance(node.func, ast.Attribute)
and isinstance(node.func.value, ast.Name)
and node.func.value.id == "subprocess"
and node.func.attr == "run"
):
for keyword in node.keywords:
if (
keyword.arg == "shell"
and isinstance(keyword.value, ast.Constant)
and keyword.value.value is True
):
self.issues.append(
{
"type": "Security Vulnerability (AST)",
"file": self.filename,
"line": node.lineno,
"message": "subprocess.run() with shell=True is dangerous; consider shell=False and passing arguments as a list.",
}
)
# Check for pickle.load
if (
isinstance(node.func, ast.Attribute)
and isinstance(node.func.value, ast.Name)
and node.func.value.id == "pickle"
and node.func.attr == "load"
):
self.issues.append(
{
"type": "Security Vulnerability (AST)",
"file": self.filename,
"line": node.lineno,
"message": "Use of pickle.load() with untrusted data is dangerous; it can execute arbitrary code.",
}
)
# Check for os.system
if (
isinstance(node.func, ast.Attribute)
and isinstance(node.func.value, ast.Name)
and node.func.value.id == "os"
and node.func.attr == "system"
):
self.issues.append(
{
"type": "Security Vulnerability (AST)",
"file": self.filename,
"line": node.lineno,
"message": "Use of os.system() is discouraged; it can execute arbitrary commands and is prone to shell injection. Consider subprocess.run() with shell=False.",
}
)
# Check for XML External Entity (XXE) vulnerability in ElementTree
if (
isinstance(node.func, ast.Attribute)
and isinstance(node.func.value, ast.Name)
and node.func.value.id == "ET"
and node.func.attr == "fromstring"
):
has_parser_none = False
for keyword in node.keywords:
if (
keyword.arg == "parser"
and isinstance(keyword.value, ast.Constant)
and keyword.value.value is None
):
has_parser_none = True
break
if has_parser_none:
self.issues.append(
{
"type": "Security Vulnerability (AST)",
"file": self.filename,
"line": node.lineno,
"message": "xml.etree.ElementTree.fromstring() with parser=None is vulnerable to XML External Entity (XXE) attacks. Use a safe parser or disable DTDs.",
}
)
self.generic_visit(node)
visitor = SecurityPatternVisitor(filename)
visitor.visit(tree)
issues.extend(visitor.issues)
except SyntaxError as se:
issues.append(
{
"type": "Syntax Error",
"file": filename,
"line": se.lineno,
"column": se.offset,
"message": f"Invalid Python syntax: {se.msg}",
}
)
except Exception as e:
logger.error(f"Error during AST analysis for {filename}: {e}")
issues.append(
{
"type": "Validation Tool Error",
"file": filename,
"message": f"Failed during AST analysis: {e}",
}
)
return issues
# --- END NEW FUNCTION ---
def validate_code_output(
parsed_change: dict[str, Any], original_content: str = None
) -> dict[str, Any]:
"""Validates a single code change (ADD, MODIFY, REMOVE) for syntax, style, and security."""
file_path_str = parsed_change.get("FILE_PATH")
action = parsed_change.get("ACTION")
content_to_check = ""
issues = []
if not file_path_str or not action:
return {
"issues": [
{
"type": "Validation Error",
"file": file_path_str or "N/A",
"message": "Missing FILE_PATH or ACTION in parsed change.",
}
]
}
file_path = Path(file_path_str)
is_python = file_path.suffix.lower() == ".py"
if action == "ADD":
content_to_check = parsed_change.get("FULL_CONTENT", "")
checksum = hashlib.sha256(content_to_check.encode("utf-8")).hexdigest()
issues.append(
{
"type": "Content Integrity",
"file": file_path_str,
"message": f"New file SHA256: {checksum}",
}
)
if is_python:
issues.extend(_run_pycodestyle(content_to_check, file_path_str))
issues.extend(_run_bandit(content_to_check, file_path_str))
# Add AST-based security checks
issues.extend(_run_ast_security_checks(content_to_check, file_path_str))
elif action == "MODIFY":
content_to_check = parsed_change.get("FULL_CONTENT", "")
checksum_new = hashlib.sha256(content_to_check.encode("utf-8")).hexdigest()
issues.append(
{
"type": "Content Integrity",
"file": file_path_str,
"message": f"Modified file (new content) SHA256: {checksum_new}",
}
)
if original_content is not None:
original_checksum = hashlib.sha256(
original_content.encode("utf-8")
).hexdigest()
if checksum_new == original_checksum:
issues.append(
{
"type": "No Change Detected",
"file": file_path_str,
"message": "New content is identical to original.",
}
)
if is_python:
issues.extend(_run_pycodestyle(content_to_check, file_path_str))
issues.extend(_run_bandit(content_to_check, file_path_str))
issues.extend(_run_ast_security_checks(content_to_check, file_path_str))
elif is_python:
issues.extend(_run_pycodestyle(content_to_check, file_path_str))
issues.extend(_run_bandit(content_to_check, file_path_str))
issues.extend(_run_ast_security_checks(content_to_check, file_path_str))
elif action == "REMOVE":
if original_content is not None:
original_lines = original_content.splitlines()
lines_to_remove = parsed_change.get("LINES", [])
original_lines_set = set(original_lines)
for line_content_to_remove in lines_to_remove:
if line_content_to_remove not in original_lines_set:
issues.append(
{
"type": "Potential Removal Mismatch",
"file": file_path_str,
"message": f"Line intended for removal not found exactly in original content: '{line_content_to_remove[:80]}'",
}
)
else:
issues.append(
{
"type": "Validation Warning",
"file": file_path_str,
"message": "Original content not provided for REMOVE action validation.",
}
)
return {"issues": issues}
return {"issues": issues}
def validate_code_output_batch(
parsed_data: dict, original_contents: dict[str, str] = None
) -> dict[str, list[dict[str, Any]]]:
"""Validates a batch of code changes and aggregates issues per file."""
if original_contents is None:
original_contents = {}
all_validation_results = {}
if not isinstance(parsed_data, dict):
logger.error(
f"validate_code_output_batch received non-dictionary parsed_data: {type(parsed_data).__name__}"
)
malformed_blocks_content = []
if isinstance(parsed_data, str):
malformed_blocks_content.append(
f"Raw output that failed type check: {parsed_data[:500]}..."
)
elif parsed_data is not None:
malformed_blocks_content.append(
f"Unexpected type for parsed_data: {type(parsed_data).__name__}"
)
return {
"issues": [
{
"type": "Internal Error",
"file": "N/A",
"message": f"Invalid input type for parsed_data: Expected dict, got {type(parsed_data).__name__}",
}
],
"malformed_blocks": malformed_blocks_content,
}
code_changes_list = parsed_data.get("CODE_CHANGES", [])
if not isinstance(code_changes_list, list):
logger.error(
f"validate_code_output_batch received non-list 'CODE_CHANGES' field: {type(code_changes_list).__name__}"
)
return {
"issues": [
{
"type": "Internal Error",
"file": "N/A",
"message": f"Invalid type for 'CODE_CHANGES': Expected list, got {type(code_changes_list).__name__}",
}
],
"malformed_blocks": parsed_data.get("malformed_blocks", []),
}
for i, change_entry in enumerate(code_changes_list):
if not isinstance(change_entry, dict):
issue_message = f"Code change entry at index {i} is not a dictionary. Type: {type(change_entry).__name__}, Value: {str(change_entry)[:100]}"
logger.error(issue_message)
all_validation_results.setdefault("N/A", []).append(
{
"type": "Malformed Change Entry",
"file": "N/A",
"message": issue_message,
}
)
continue
file_path = change_entry.get("FILE_PATH")
if file_path:
try:
original_content = original_contents.get(file_path)
validation_result = validate_code_output(change_entry, original_content)
all_validation_results[file_path] = validation_result.get("issues", [])
logger.debug(
f"Validation for {file_path} completed with {len(validation_result.get('issues', []))} issues."
)
except Exception as e:
logger.error(
f"Error during validation of change entry {i} for file {file_path}: {e}"
)
if file_path not in all_validation_results:
all_validation_results[file_path] = []
all_validation_results[file_path].append(
{
"type": "Validation Tool Error",
"file": file_path,
"message": f"Failed to validate: {e}",
}
)
else:
logger.warning(
f"Encountered a code change without a 'FILE_PATH' in output {i}. Skipping validation for this item."
)
all_validation_results.setdefault("N/A", []).append(
{
"type": "Validation Error",
"file": "N/A",
"message": f"Change item at index {i} missing FILE_PATH.",
}
)
# --- New: Unit Test Presence Check ---
python_files_modified_or_added = {
change["FILE_PATH"]
for change in code_changes_list
if change.get("FILE_PATH", "").endswith(".py")
and change.get("ACTION") in ["ADD", "MODIFY"]
}
test_files_added = {
change["FILE_PATH"]
for change in code_changes_list
if change.get("FILE_PATH", "").startswith("tests/")
and change.get("ACTION") == "ADD"
}
for py_file in python_files_modified_or_added:
expected_test_file_prefix = f"tests/test_{Path(py_file).stem}"
if not any(
test_file.startswith(expected_test_file_prefix)
for test_file in test_files_added
):
all_validation_results.setdefault(py_file, []).append(
{
"type": "Missing Unit Test",
"file": py_file,
"message": f"No corresponding unit test file found for this Python change. Expected a file like '{expected_test_file_prefix}.py' in 'tests/'.",
}
)
logger.info(
f"Batch validation completed. Aggregated issues for {len(all_validation_results)} files."
)
return all_validation_results