mirror of
https://github.com/JamesTheGiblet/BuddAI.git
synced 2026-01-08 21:58:40 +00:00
Add unit tests for analytics, fallback client, and refactored validators
- Implemented comprehensive unit tests for the BuddAI Analytics module, covering fallback statistics calculations. - Created tests for the FallbackClient to ensure proper escalation to various AI models and handling of missing API keys. - Developed unit tests for the refactored validator system, validating various hardware and coding standards. - Established a base validator interface and implemented specific validators for ESP32, Arduino, motor control, memory safety, and more. - Enhanced the validator registry to auto-discover and manage validators effectively. - Included detailed validation logic for common issues in embedded systems programming, such as unused variables, safety timeouts, and coding style violations.
This commit is contained in:
parent
99ef8f5592
commit
d4e09f6d13
43 changed files with 5036 additions and 622 deletions
63
validators/__init__.py
Normal file
63
validators/__init__.py
Normal file
|
|
@ -0,0 +1,63 @@
|
|||
from typing import List, Dict
|
||||
|
||||
class BaseValidator:
|
||||
def validate(self, code: str, hardware: str, user_message: str) -> List[Dict]:
|
||||
"""Return a list of issues found."""
|
||||
return []
|
||||
|
||||
def find_line(self, code: str, substring: str) -> int:
|
||||
for i, line in enumerate(code.splitlines(), 1):
|
||||
if substring in line:
|
||||
return i
|
||||
return -1
|
||||
|
||||
from .esp32_basics import ESP32Validator
|
||||
from .motor_control import MotorValidator
|
||||
from .servo_control import ServoValidator
|
||||
from .memory_safety import MemoryValidator
|
||||
from .forge_theory import ForgeTheoryValidator
|
||||
from .timing_safety import TimingValidator
|
||||
from .arduino_compat import ArduinoValidator
|
||||
from .style_guide import StyleValidator
|
||||
from typing import List, Dict
|
||||
|
||||
class BaseValidator:
|
||||
def validate(self, code: str, hardware: str, user_message: str) -> List[Dict]:
|
||||
"""Return a list of issues found."""
|
||||
return []
|
||||
|
||||
def find_line(self, code: str, substring: str) -> int:
|
||||
for i, line in enumerate(code.splitlines(), 1):
|
||||
if substring in line:
|
||||
return i
|
||||
return -1
|
||||
|
||||
from .esp32_basics import ESP32Validator
|
||||
from .motor_control import MotorValidator
|
||||
from .servo_control import ServoValidator
|
||||
from .memory_safety import MemoryValidator
|
||||
from .forge_theory import ForgeTheoryValidator
|
||||
from .timing_safety import TimingValidator
|
||||
from .arduino_compat import ArduinoValidator
|
||||
from .style_guide import StyleValidator
|
||||
from typing import List, Dict
|
||||
|
||||
class BaseValidator:
|
||||
def validate(self, code: str, hardware: str, user_message: str) -> List[Dict]:
|
||||
"""Return a list of issues found."""
|
||||
return []
|
||||
|
||||
def find_line(self, code: str, substring: str) -> int:
|
||||
for i, line in enumerate(code.splitlines(), 1):
|
||||
if substring in line:
|
||||
return i
|
||||
return -1
|
||||
|
||||
from .esp32_basics import ESP32Validator
|
||||
from .motor_control import MotorValidator
|
||||
from .servo_control import ServoValidator
|
||||
from .memory_safety import MemoryValidator
|
||||
from .forge_theory import ForgeTheoryValidator
|
||||
from .timing_safety import TimingValidator
|
||||
from .arduino_compat import ArduinoValidator
|
||||
from .style_guide import StyleValidator
|
||||
51
validators/arduino_compat.py
Normal file
51
validators/arduino_compat.py
Normal file
|
|
@ -0,0 +1,51 @@
|
|||
import re
|
||||
from . import BaseValidator
|
||||
|
||||
class ArduinoValidator(BaseValidator):
|
||||
def validate(self, code: str, hardware: str, user_message: str) -> list[dict]:
|
||||
issues = []
|
||||
|
||||
# Check 9: Unnecessary Wire.h
|
||||
wire_include = re.search(r'#include\s+[<"]Wire\.h[>"]', code)
|
||||
if wire_include:
|
||||
rest_of_code = code.replace(wire_include.group(0), "")
|
||||
if not re.search(r'\bWire\b', rest_of_code):
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"line": self.find_line(code, wire_include.group(0)),
|
||||
"message": "Unnecessary #include <Wire.h> detected.",
|
||||
"fix": lambda c: re.sub(r'#include\s+[<"]Wire\.h[>"]', '// [Auto-Fix] Removed unnecessary Wire.h', c)
|
||||
})
|
||||
|
||||
# Check 12: Undefined Pin Constants
|
||||
pin_vars = set(re.findall(r'(?:digitalRead|digitalWrite|pinMode|ledcAttachPin)\s*\(\s*([a-zA-Z_]\w+)', code))
|
||||
for var in pin_vars:
|
||||
if var in ['LED_BUILTIN', 'HIGH', 'LOW', 'INPUT', 'OUTPUT', 'INPUT_PULLUP', 'true', 'false']:
|
||||
continue
|
||||
|
||||
is_defined = re.search(r'#define\s+' + re.escape(var) + r'\b', code) or \
|
||||
re.search(r'\b(?:const\s+)?(?:int|byte|uint8_t|short)\s+' + re.escape(var) + r'\s*=', code)
|
||||
|
||||
if not is_defined:
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"message": f"Undefined variable '{var}' used in pin operation.",
|
||||
"fix": lambda c, v=var: f"#define {v} 2 // [Auto-Fix] Defined missing pin\n" + c
|
||||
})
|
||||
|
||||
# Check 25: Missing Serial.begin
|
||||
if re.search(r'Serial\.(?:print|write|println|printf)', code) and not re.search(r'Serial\.begin\s*\(', code):
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"message": "Missing Serial.begin() initialization.",
|
||||
"fix": lambda c: re.sub(r'void\s+setup\s*\(\s*\)\s*\{', r'void setup() {\n Serial.begin(115200);', c, count=1)
|
||||
})
|
||||
|
||||
# Check 26: Missing Wire.begin
|
||||
if re.search(r'Wire\.(?!h\b|begin\b)', code) and not re.search(r'Wire\.begin\s*\(', code):
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"message": "Missing Wire.begin() initialization for I2C.",
|
||||
"fix": lambda c: re.sub(r'void\s+setup\s*\(\s*\)\s*\{', r'void setup() {\n Wire.begin();', c, count=1)
|
||||
})
|
||||
return issues
|
||||
40
validators/base_validator.py
Normal file
40
validators/base_validator.py
Normal file
|
|
@ -0,0 +1,40 @@
|
|||
"""
|
||||
Base validator interface
|
||||
All validators inherit from this
|
||||
"""
|
||||
|
||||
class BaseValidator:
|
||||
"""Base class for all validators"""
|
||||
|
||||
name = "Base Validator"
|
||||
triggers = [] # Keywords that activate this validator
|
||||
priority = 5 # 1=critical, 10=nice-to-have
|
||||
|
||||
def validate(self, code: str, context: dict) -> list:
|
||||
"""
|
||||
Validate code and return issues
|
||||
|
||||
Args:
|
||||
code: The code to validate
|
||||
context: Dict with hardware, libraries, etc.
|
||||
|
||||
Returns:
|
||||
List of issue dicts:
|
||||
{
|
||||
'severity': 'error|warning|info',
|
||||
'line': line_number or None,
|
||||
'message': 'What is wrong',
|
||||
'fix': 'How to fix it'
|
||||
}
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def matches_context(self, code: str, context: dict) -> bool:
|
||||
"""
|
||||
Check if this validator should run
|
||||
|
||||
Returns:
|
||||
bool: True if any trigger keyword in code
|
||||
"""
|
||||
code_lower = code.lower()
|
||||
return any(trigger.lower() in code_lower for trigger in self.triggers)
|
||||
37
validators/esp32_basics.py
Normal file
37
validators/esp32_basics.py
Normal file
|
|
@ -0,0 +1,37 @@
|
|||
import re
|
||||
from . import BaseValidator
|
||||
|
||||
class ESP32Validator(BaseValidator):
|
||||
def validate(self, code: str, hardware: str, user_message: str) -> list[dict]:
|
||||
issues = []
|
||||
if "ESP32" in hardware.upper():
|
||||
# Check 1: analogWrite
|
||||
if "analogWrite" in code:
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"line": self.find_line(code, "analogWrite"),
|
||||
"message": "ESP32 doesn't support analogWrite(). Use ledcWrite()",
|
||||
"fix": lambda c: c.replace("analogWrite", "ledcWrite")
|
||||
})
|
||||
|
||||
# Check 18: ADC Resolution
|
||||
adc_res_match = re.search(r'#define\s+(\w*ADC\w*RES\w*)\s+(\d+)', code, re.IGNORECASE)
|
||||
if adc_res_match:
|
||||
val = int(adc_res_match.group(2))
|
||||
if val not in [4095, 4096]:
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"line": self.find_line(code, adc_res_match.group(0)),
|
||||
"message": f"Hardware Mismatch: ESP32 ADC is 12-bit (4095), not {val}.",
|
||||
"fix": lambda c, old=adc_res_match.group(0), name=adc_res_match.group(1): c.replace(old, f"#define {name} 4095")
|
||||
})
|
||||
|
||||
# Check 20: Hardcoded 10-bit ADC math
|
||||
for match in re.finditer(r'/\s*(1023(?:\.0?)?f?|1024(?:\.0)f?)', code):
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"line": self.find_line(code, match.group(0)),
|
||||
"message": "Hardware Mismatch: ESP32 ADC is 12-bit. Use 4095.0, not 1023/1024.",
|
||||
"fix": lambda c, m=match.group(0): c.replace(m, "/ 4095.0")
|
||||
})
|
||||
return issues
|
||||
6
validators/forge_theory.py
Normal file
6
validators/forge_theory.py
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
from . import BaseValidator
|
||||
|
||||
class ForgeTheoryValidator(BaseValidator):
|
||||
def validate(self, code: str, hardware: str, user_message: str) -> list[dict]:
|
||||
# Placeholder for exponential decay validation
|
||||
return []
|
||||
71
validators/memory_safety.py
Normal file
71
validators/memory_safety.py
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
|
||||
import re
|
||||
from . import BaseValidator
|
||||
|
||||
class MemoryValidator(BaseValidator):
|
||||
def validate(self, code: str, hardware: str, user_message: str) -> list[dict]:
|
||||
issues = []
|
||||
|
||||
# Check 24: Unused Variables in Setup
|
||||
setup_match = re.search(r'void\s+setup\s*\(\s*\)\s*\{', code)
|
||||
if setup_match:
|
||||
start_idx = setup_match.end()
|
||||
brace_count = 1
|
||||
setup_body = ""
|
||||
for char in code[start_idx:]:
|
||||
if char == '{': brace_count += 1
|
||||
elif char == '}': brace_count -= 1
|
||||
if brace_count == 0: break
|
||||
setup_body += char
|
||||
|
||||
clean_body = re.sub(r'//.*', '', setup_body)
|
||||
clean_body = re.sub(r'/\*.*?\*/', '', clean_body, flags=re.DOTALL)
|
||||
|
||||
local_vars = re.finditer(r'\b((?:static\s+)?(?:const\s+)?(?:int|float|bool|char|String|long|double|byte|uint8_t|unsigned(?:\s+long)?))\s+([a-zA-Z_]\w*)\s*(?:=|;)', clean_body)
|
||||
|
||||
for match in local_vars:
|
||||
var_type = match.group(1)
|
||||
var_name = match.group(2)
|
||||
if len(re.findall(r'\b' + re.escape(var_name) + r'\b', clean_body)) == 1:
|
||||
issues.append({
|
||||
"severity": "warning",
|
||||
"line": self.find_line(code, f"{var_type} {var_name}"),
|
||||
"message": f"Unused variable '{var_name}' in setup().",
|
||||
"fix": lambda c, v=var_name, t=var_type: re.sub(r'\b' + re.escape(t) + r'\s+' + re.escape(v) + r'[^;]*;\s*', '', c)
|
||||
})
|
||||
return issues
|
||||
import re
|
||||
from . import BaseValidator
|
||||
|
||||
class MemoryValidator(BaseValidator):
|
||||
def validate(self, code: str, hardware: str, user_message: str) -> list[dict]:
|
||||
issues = []
|
||||
|
||||
# Check 24: Unused Variables in Setup
|
||||
setup_match = re.search(r'void\s+setup\s*\(\s*\)\s*\{', code)
|
||||
if setup_match:
|
||||
start_idx = setup_match.end()
|
||||
brace_count = 1
|
||||
setup_body = ""
|
||||
for char in code[start_idx:]:
|
||||
if char == '{': brace_count += 1
|
||||
elif char == '}': brace_count -= 1
|
||||
if brace_count == 0: break
|
||||
setup_body += char
|
||||
|
||||
clean_body = re.sub(r'//.*', '', setup_body)
|
||||
clean_body = re.sub(r'/\*.*?\*/', '', clean_body, flags=re.DOTALL)
|
||||
|
||||
local_vars = re.finditer(r'\b((?:static\s+)?(?:const\s+)?(?:int|float|bool|char|String|long|double|byte|uint8_t|unsigned(?:\s+long)?))\s+([a-zA-Z_]\w*)\s*(?:=|;)', clean_body)
|
||||
|
||||
for match in local_vars:
|
||||
var_type = match.group(1)
|
||||
var_name = match.group(2)
|
||||
if len(re.findall(r'\b' + re.escape(var_name) + r'\b', clean_body)) == 1:
|
||||
issues.append({
|
||||
"severity": "warning",
|
||||
"line": self.find_line(code, f"{var_type} {var_name}"),
|
||||
"message": f"Unused variable '{var_name}' in setup().",
|
||||
"fix": lambda c, v=var_name, t=var_type: re.sub(r'\b' + re.escape(t) + r'\s+' + re.escape(v) + r'[^;]*;\s*', '', c)
|
||||
})
|
||||
return issues
|
||||
43
validators/motor_control.py
Normal file
43
validators/motor_control.py
Normal file
|
|
@ -0,0 +1,43 @@
|
|||
import re
|
||||
from . import BaseValidator
|
||||
|
||||
class MotorValidator(BaseValidator):
|
||||
def validate(self, code: str, hardware: str, user_message: str) -> list[dict]:
|
||||
issues = []
|
||||
|
||||
# Check 4: L298N PWM Pin Misuse
|
||||
pwm_pins = re.findall(r'ledcAttachPin\s*\(\s*(\w+)\s*,', code)
|
||||
for pin in pwm_pins:
|
||||
if re.search(r'digitalWrite\s*\(\s*' + re.escape(pin) + r'\s*,', code):
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"line": self.find_line(code, f"digitalWrite({pin}"),
|
||||
"message": f"Conflict: PWM pin '{pin}' used with digitalWrite(). Use ledcWrite() for speed control.",
|
||||
"fix": lambda c, p=pin: re.sub(r'digitalWrite\s*\(\s*' + re.escape(p) + r'\s*,\s*[^)]+\);?', f'// [Fixed] Removed conflicting digitalWrite on PWM pin {p}', c)
|
||||
})
|
||||
|
||||
# Check 8: Incomplete Motor Logic (L298N Validation)
|
||||
is_l298n_request = "l298n" in user_message.lower() or "dc motor" in user_message.lower() or ("motor" in user_message.lower() and "servo" not in user_message.lower())
|
||||
|
||||
if is_l298n_request:
|
||||
if not re.search(r'(?:#define|const\s+int)\s+\w*(?:IN1|IN2|DIR)\w*', code, re.IGNORECASE):
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"message": "Missing L298N Direction Pins (IN1/IN2).",
|
||||
"fix": lambda c: "// [AUTO-FIX] L298N Definitions\n#define IN1 18\n#define IN2 19\n" + c
|
||||
})
|
||||
|
||||
if not re.search(r'(?:#define|const\s+int)\s+\w*(?:ENA|ENB|PWM)\w*', code, re.IGNORECASE):
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"message": "Missing L298N PWM Pin (ENA).",
|
||||
"fix": lambda c: "#define ENA 21 // [AUTO-FIX] Missing PWM Pin\n" + c
|
||||
})
|
||||
|
||||
if "digitalWrite" not in code:
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"message": "L298N requires digitalWrite() for direction control.",
|
||||
"fix": lambda c: re.sub(r'(void\s+loop\s*\(\s*\)\s*\{)', r'\1\n // [AUTO-FIX] Set Direction\n digitalWrite(IN1, HIGH);\n digitalWrite(IN2, LOW);\n', c)
|
||||
})
|
||||
return issues
|
||||
63
validators/registry.py
Normal file
63
validators/registry.py
Normal file
|
|
@ -0,0 +1,63 @@
|
|||
"""
|
||||
Validator Registry
|
||||
Auto-discovers and manages validators
|
||||
"""
|
||||
|
||||
import os
|
||||
import importlib
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
from .base_validator import BaseValidator
|
||||
|
||||
class ValidatorRegistry:
|
||||
def __init__(self):
|
||||
self.validators = {}
|
||||
self.load_validators()
|
||||
|
||||
def load_validators(self):
|
||||
"""Auto-discover validators in validators/ folder"""
|
||||
validator_dir = Path(__file__).parent
|
||||
|
||||
for file in validator_dir.glob('*.py'):
|
||||
if file.name.startswith('_') or file.name == 'base_validator.py':
|
||||
continue
|
||||
|
||||
# Import the module
|
||||
module_name = file.stem
|
||||
module = importlib.import_module(f'.{module_name}', package='validators')
|
||||
|
||||
# Find validator classes
|
||||
for attr_name in dir(module):
|
||||
attr = getattr(module, attr_name)
|
||||
if (isinstance(attr, type) and
|
||||
issubclass(attr, BaseValidator) and
|
||||
attr != BaseValidator):
|
||||
|
||||
# Instantiate and register
|
||||
validator = attr()
|
||||
self.validators[validator.name] = validator
|
||||
print(f"✅ Loaded validator: {validator.name}")
|
||||
|
||||
def get_validators_for(self, code: str, context: dict) -> List[BaseValidator]:
|
||||
"""Get validators that match this code/context"""
|
||||
relevant = []
|
||||
|
||||
for validator in self.validators.values():
|
||||
if validator.matches_context(code, context):
|
||||
relevant.append(validator)
|
||||
|
||||
# Sort by priority (lower number = higher priority)
|
||||
relevant.sort(key=lambda v: v.priority)
|
||||
|
||||
return relevant
|
||||
|
||||
def validate_all(self, code: str, context: dict) -> list:
|
||||
"""Run all relevant validators"""
|
||||
validators = self.get_validators_for(code, context)
|
||||
all_issues = []
|
||||
|
||||
for validator in validators:
|
||||
issues = validator.validate(code, context)
|
||||
all_issues.extend(issues)
|
||||
|
||||
return all_issues
|
||||
23
validators/servo_control.py
Normal file
23
validators/servo_control.py
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
import re
|
||||
from . import BaseValidator
|
||||
|
||||
class ServoValidator(BaseValidator):
|
||||
def validate(self, code: str, hardware: str, user_message: str) -> list[dict]:
|
||||
issues = []
|
||||
|
||||
# Check 14: State Machine for Weapons (Combat Protocol)
|
||||
if "weapon" in user_message.lower() or "combat" in user_message.lower() or "state machine" in user_message.lower():
|
||||
if "enum" not in code and "bool isArmed" not in code:
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"message": "Combat code requires a State Machine (enum State or bool isArmed).",
|
||||
"fix": lambda c: c.replace("void setup", "\n// [AUTO-FIX] State Machine\nenum State { DISARMED, ARMING, ARMED, FIRING };\nState currentState = DISARMED;\nunsigned long stateTimer = 0;\n\nvoid setup") if "void setup" in c else "// [AUTO-FIX] State Machine\nenum State { DISARMED, ARMING, ARMED, FIRING };\nState currentState = DISARMED;\n" + c
|
||||
})
|
||||
|
||||
if "Serial.read" not in code and "Serial.available" not in code:
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"message": "Missing Serial Command handling (e.g., 'A' to Arm).",
|
||||
"fix": lambda c: c.replace("void loop() {", "void loop() {\n if (Serial.available()) {\n char cmd = Serial.read();\n // Handle commands\n }\n")
|
||||
})
|
||||
return issues
|
||||
143
validators/style_guide.py
Normal file
143
validators/style_guide.py
Normal file
|
|
@ -0,0 +1,143 @@
|
|||
import re
|
||||
from . import BaseValidator
|
||||
|
||||
class StyleValidator(BaseValidator):
|
||||
def refactor_loop_to_function(self, code: str) -> str:
|
||||
loop_match = re.search(r'void\s+loop\s*\(\s*\)\s*\{', code)
|
||||
if not loop_match: return code
|
||||
|
||||
start_idx = loop_match.end()
|
||||
brace_count = 1
|
||||
loop_body_end = -1
|
||||
|
||||
for i, char in enumerate(code[start_idx:], start=start_idx):
|
||||
if char == '{': brace_count += 1
|
||||
elif char == '}': brace_count -= 1
|
||||
|
||||
if brace_count == 0:
|
||||
loop_body_end = i
|
||||
break
|
||||
|
||||
if loop_body_end == -1: return code
|
||||
|
||||
body = code[start_idx:loop_body_end]
|
||||
new_code = code[:start_idx] + "\n runSystemLogic();\n" + code[loop_body_end:]
|
||||
new_code += "\n\nvoid runSystemLogic() {" + body + "}\n"
|
||||
return new_code
|
||||
|
||||
def validate(self, code: str, hardware: str, user_message: str) -> list[dict]:
|
||||
issues = []
|
||||
|
||||
# Check 11: Feature Bloat (Unrequested Button)
|
||||
if user_message:
|
||||
msg_lower = user_message.lower()
|
||||
if not any(w in msg_lower for w in ['button', 'switch', 'input', 'trigger']):
|
||||
for match in re.finditer(r'(?:int|bool|byte)\s+(\w*(?:button|btn|switch)\w*)\s*=\s*digitalRead\s*\([^;]+;', code, re.IGNORECASE):
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"line": self.find_line(code, match.group(0)),
|
||||
"message": f"Feature Bloat: Unrequested button code detected ('{match.group(1)}').",
|
||||
"fix": lambda c, m=match.group(0): c.replace(m, "")
|
||||
})
|
||||
|
||||
for match in re.finditer(r'digitalRead\s*\(\s*(\w*(?:BUTTON|BTN|SWITCH)\w*)\s*\)', code, re.IGNORECASE):
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"line": self.find_line(code, match.group(0)),
|
||||
"message": f"Feature Bloat: Unrequested button check detected ('{match.group(1)}').",
|
||||
"fix": lambda c, m=match.group(0): c.replace(m, "0")
|
||||
})
|
||||
|
||||
for match in re.finditer(r'pinMode\s*\(\s*\w+\s*,\s*INPUT(?:_PULLUP)?\s*\);', code):
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"line": self.find_line(code, match.group(0)),
|
||||
"message": "Feature Bloat: Unrequested input pin configuration.",
|
||||
"fix": lambda c, m=match.group(0): c.replace(m, "")
|
||||
})
|
||||
|
||||
for match in re.finditer(r'(?:int|bool|byte)\s+(\w*(?:button|btn|switch)\w*)\s*=\s*(?:LOW|HIGH|0|1|false|true)\s*;', code, re.IGNORECASE):
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"line": self.find_line(code, match.group(0)),
|
||||
"message": f"Feature Bloat: Unused button variable '{match.group(1)}'.",
|
||||
"fix": lambda c, m=match.group(0): c.replace(m, "")
|
||||
})
|
||||
|
||||
# Check 15: Function Naming Conventions
|
||||
func_defs = re.finditer(r'\b(void|int|bool|float|double|String|char|long|unsigned(?:\s+long)?)\s+([a-zA-Z0-9_]+)\s*\(', code)
|
||||
for match in func_defs:
|
||||
func_name = match.group(2)
|
||||
if func_name in ['setup', 'loop', 'main']: continue
|
||||
|
||||
if not re.match(r'^[a-z][a-zA-Z0-9]*$', func_name):
|
||||
suggestion = func_name
|
||||
if '_' in func_name:
|
||||
components = func_name.split('_')
|
||||
suggestion = components[0].lower() + ''.join(x.title() for x in components[1:])
|
||||
elif func_name[0].isupper():
|
||||
suggestion = func_name[0].lower() + func_name[1:]
|
||||
|
||||
issues.append({
|
||||
"severity": "warning",
|
||||
"line": self.find_line(code, match.group(0)),
|
||||
"message": f"Style: Function '{func_name}' should be camelCase (e.g., '{suggestion}').",
|
||||
"fix": lambda c, old=func_name, new=suggestion: c.replace(old, new)
|
||||
})
|
||||
|
||||
# Check 16: Monolithic Code Structure
|
||||
if "function" in user_message.lower() or "naming" in user_message.lower() or "modular" in user_message.lower():
|
||||
has_custom_funcs = False
|
||||
for match in re.finditer(r'\b(void|int|bool|float|double|String|char|long|unsigned(?:\s+long)?)\s+([a-zA-Z0-9_]+)\s*\(', code):
|
||||
if match.group(2) not in ['setup', 'loop', 'main']:
|
||||
has_custom_funcs = True
|
||||
break
|
||||
|
||||
if not has_custom_funcs:
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"message": "Structure Violation: Request asked for functions but code is monolithic.",
|
||||
"fix": lambda c: c.replace("void loop() {", "void loop() {\n runSystemLogic();\n}\n\nvoid runSystemLogic() {") + "\n}"
|
||||
})
|
||||
|
||||
# Check 17: Loop Length
|
||||
if "function" in user_message.lower() or "naming" in user_message.lower() or "modular" in user_message.lower():
|
||||
loop_match = re.search(r'void\s+loop\s*\(\s*\)\s*\{', code)
|
||||
if loop_match:
|
||||
start_idx = loop_match.end()
|
||||
brace_count = 1
|
||||
loop_body = ""
|
||||
for char in code[start_idx:]:
|
||||
if char == '{': brace_count += 1
|
||||
elif char == '}': brace_count -= 1
|
||||
if brace_count == 0: break
|
||||
loop_body += char
|
||||
|
||||
lines = [line.strip() for line in loop_body.split('\n')]
|
||||
significant_lines = [l for l in lines if l and not l.startswith('//') and not l.startswith('/*') and l != '']
|
||||
|
||||
if len(significant_lines) >= 10:
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"message": f"Modularity Violation: loop() has {len(significant_lines)} lines (limit 10). Move logic to functions.",
|
||||
"fix": lambda c: self.refactor_loop_to_function(c)
|
||||
})
|
||||
|
||||
# Check 21: Status LED Pattern
|
||||
if "status" in user_message.lower() and ("led" in user_message.lower() or "indicator" in user_message.lower()):
|
||||
breathing_match = re.search(r'(?:dutyCycle|brightness)\s*(\+=|\+\+|\-=|\-\-)', code)
|
||||
if breathing_match:
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"line": self.find_line(code, breathing_match.group(0)),
|
||||
"message": "Wrong Pattern: Status indicators should use Blink Patterns (States), not Breathing/Fading.",
|
||||
"fix": lambda c: c + "\n// [Fix Required] Implement setStatusLED(LEDStatus state) instead of fading."
|
||||
})
|
||||
|
||||
if not re.search(r'enum\s+(?:StatusState|LEDStatus)\s*\{', code):
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"message": "Missing Status Enum: Status LEDs require a state machine (enum LEDStatus {OFF, IDLE, ACTIVE, ERROR}).",
|
||||
"fix": lambda c: c.replace("void setup", "\n// [AUTO-FIX] Status Enum\nenum LEDStatus { OFF, IDLE, ACTIVE, ERROR };\nLEDStatus currentStatus = IDLE;\nunsigned long lastBlink = 0;\n\nvoid setup") if "void setup" in c else "// [AUTO-FIX] Status Enum\nenum LEDStatus { OFF, IDLE, ACTIVE, ERROR };\nLEDStatus currentStatus = IDLE;\nunsigned long lastBlink = 0;\n" + c
|
||||
})
|
||||
return issues
|
||||
108
validators/timing_safety.py
Normal file
108
validators/timing_safety.py
Normal file
|
|
@ -0,0 +1,108 @@
|
|||
import re
|
||||
from . import BaseValidator
|
||||
|
||||
class TimingValidator(BaseValidator):
|
||||
def has_safety_timeout(self, code: str) -> bool:
|
||||
if "millis()" not in code: return False
|
||||
if re.search(r'>\s*[A-Z_]*TIMEOUT', code): return True
|
||||
if "DISARM" in code and "millis" in code and ">" in code: return True
|
||||
comparisons = re.findall(r'>\s*(\d+)', code)
|
||||
return any(int(val) > 500 for val in comparisons)
|
||||
|
||||
def validate(self, code: str, hardware: str, user_message: str) -> list[dict]:
|
||||
issues = []
|
||||
|
||||
# Check 2: Non-blocking code
|
||||
if "delay(" in code and "motor" in code.lower():
|
||||
issues.append({
|
||||
"severity": "warning",
|
||||
"line": self.find_line(code, "delay"),
|
||||
"message": "Using delay() in motor code blocks safety checks",
|
||||
"fix": lambda c: c # No auto-fix
|
||||
})
|
||||
|
||||
# Check 3: Safety timeout
|
||||
if ("motor" in code.lower() or "servo" in code.lower()):
|
||||
if not self.has_safety_timeout(code):
|
||||
is_servo = "Servo" in code and "L298N" not in code
|
||||
stop_logic = " // STOP MOTORS\n ledcWrite(0, 0);\n ledcWrite(1, 0);"
|
||||
if is_servo:
|
||||
stop_logic = " // STOP SERVO\n // Implement safe position (e.g. myServo.write(90));"
|
||||
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"message": "Critical: No safety timeout detected (must be > 500ms).",
|
||||
"fix": lambda c, sl=stop_logic: "#define SAFETY_TIMEOUT 5000\nunsigned long lastCommand = 0;\n" + \
|
||||
re.sub(r'(void\s+loop\s*\(\s*\)\s*\{)', \
|
||||
rf'\1\n // [AUTO-FIX] Safety Timeout\n if (millis() - lastCommand > SAFETY_TIMEOUT) {{\n{sl}\n }}\n', c)
|
||||
})
|
||||
|
||||
# Check 5: Broken Debounce Logic
|
||||
bad_debounce = re.search(r'if\s*\(\s*\w+\s*[!=]=\s*\w*DebounceTime\s*\)', code)
|
||||
if bad_debounce:
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"line": self.find_line(code, bad_debounce.group(0)),
|
||||
"message": "Type Mismatch: Comparing button state (int) with time (long).",
|
||||
"fix": lambda c: c.replace(bad_debounce.group(0), "if ((millis() - lastDebounceTime) > debounceDelay)")
|
||||
})
|
||||
|
||||
# Check 6: Safety Timeout Value
|
||||
timeout_match = re.search(r'#define\s+SAFETY_TIMEOUT\s+(\d+)', code)
|
||||
if timeout_match and int(timeout_match.group(1)) > 5000:
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"line": self.find_line(code, timeout_match.group(0)),
|
||||
"message": f"Safety timeout {timeout_match.group(1)}ms is too long (Max: 5000ms).",
|
||||
"fix": lambda c: re.sub(r'(#define\s+SAFETY_TIMEOUT\s+)\d+', r'\g<1>5000', c)
|
||||
})
|
||||
|
||||
# Check 7: Broken Safety Timer Logic
|
||||
bad_static = re.search(r'static\s+unsigned\s+long\s+(\w+)\s*=\s*millis\(\);', code)
|
||||
if bad_static:
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"line": self.find_line(code, bad_static.group(0)),
|
||||
"message": "Static timer initialized with millis() prevents reset. Initialize to 0.",
|
||||
"fix": lambda c: c.replace(bad_static.group(0), f"static unsigned long {bad_static.group(1)} = 0;")
|
||||
})
|
||||
|
||||
# Check 10: High-Frequency Serial Logging
|
||||
if ("Serial.print" in code or "Serial.write" in code) and \
|
||||
("motor" in code.lower() or "servo" in code.lower()):
|
||||
if not re.search(r'(print|log|debug|serial)\s*Timer', code, re.IGNORECASE) and \
|
||||
not re.search(r'last\s*(Print|Log|Debug)', code, re.IGNORECASE):
|
||||
issues.append({
|
||||
"severity": "warning",
|
||||
"line": self.find_line(code, "Serial.print"),
|
||||
"message": "Serial logging in motor loops causes jitter. Ensure it's throttled (e.g. every 100ms).",
|
||||
"fix": lambda c: c + "\n// [Performance] Warning: Serial.print() inside loops can interrupt motor timing."
|
||||
})
|
||||
|
||||
# Check 19: Unnecessary Debouncing (Analog/Battery)
|
||||
if "battery" in user_message.lower() or "voltage" in user_message.lower() or "analog" in user_message.lower():
|
||||
if "button" not in user_message.lower():
|
||||
debounce_match = re.search(r'(?:debounce|lastDebounceTime)', code, re.IGNORECASE)
|
||||
if debounce_match:
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"line": self.find_line(code, debounce_match.group(0)),
|
||||
"message": "Logic Error: Debouncing detected in analog/battery code. Analog sensors don't need debouncing.",
|
||||
"fix": lambda c: re.sub(r'.*debounce.*', '// [Fixed] Removed unnecessary debounce logic', c, flags=re.IGNORECASE)
|
||||
})
|
||||
|
||||
# Check 22: Misused Debouncing (Animation Timing)
|
||||
if "brightness" in code or "fade" in code:
|
||||
misused_debounce = re.search(r'if\s*\(\s*\(?\s*millis\(\)\s*-\s*\w+\s*\)?\s*>\s*(\w*DEBOUNCE\w*)\s*\)\s*\{', code, re.IGNORECASE)
|
||||
if misused_debounce:
|
||||
var_name = misused_debounce.group(1)
|
||||
start_index = misused_debounce.end()
|
||||
snippet = code[start_index:start_index+200]
|
||||
if any(x in snippet for x in ['brightness', 'fade', 'dutyCycle', 'ledcWrite']):
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"line": self.find_line(code, var_name),
|
||||
"message": f"Semantic Error: Using {var_name} for animation/fading. Use UPDATE_INTERVAL or FADE_SPEED.",
|
||||
"fix": lambda c, v=var_name: c.replace(v, "FADE_SPEED" if v.isupper() else "fadeSpeed")
|
||||
})
|
||||
return issues
|
||||
Loading…
Add table
Add a link
Reference in a new issue