mirror of
https://github.com/JamesTheGiblet/BuddAI.git
synced 2026-01-08 21:58:40 +00:00
- Added `ModelFineTuner` class for preparing training data and fine-tuning models based on user corrections. - Introduced `CodeValidator` class to validate generated code against various hardware and style rules, including safety checks and function naming conventions. - Developed skills for calculator operations, system information retrieval, weather fetching, and timer functionality. - Implemented a self-diagnostic skill to run unit tests and report results. - Created a dynamic skill loading mechanism to discover and register skills from the current directory. - Added unit tests for skills to ensure functionality and reliability.
310 lines
No EOL
13 KiB
Python
310 lines
No EOL
13 KiB
Python
import sqlite3
|
|
import re
|
|
from typing import List, Dict, Optional
|
|
from core.buddai_shared import DB_PATH, COMPLEX_TRIGGERS, MODULE_PATTERNS
|
|
|
|
class PromptEngine:
|
|
"""Handles prompt construction, hardware classification, and request analysis"""
|
|
|
|
def classify_hardware(self, user_message: str, context_messages: List[Dict] = None) -> dict:
|
|
"""Detect what hardware this question is about"""
|
|
|
|
hardware = {
|
|
"servo": False,
|
|
"dc_motor": False,
|
|
"button": False,
|
|
"led": False,
|
|
"sensor": False,
|
|
"weapon": False
|
|
}
|
|
|
|
msg_lower = user_message.lower()
|
|
|
|
# Helper to check keywords
|
|
def has_keywords(text, keywords):
|
|
return any(word in text for word in keywords)
|
|
|
|
# Keyword definitions
|
|
servo_kws = ['servo', 'mg996', 'sg90']
|
|
motor_kws = ['l298n', 'dc motor', 'motor driver', 'motor control']
|
|
button_kws = ['button', 'switch', 'trigger']
|
|
led_kws = ['led', 'light', 'brightness', 'indicator']
|
|
# Removed 'state machine' from weapon_kws to allow abstract logic
|
|
weapon_kws = ['weapon', 'combat', 'arming', 'fire', 'spinner', 'flipper']
|
|
logic_kws = ['state machine', 'logic', 'structure', 'flow', 'armed', 'disarmed']
|
|
|
|
# 1. Check current message first
|
|
detected_in_current = False
|
|
|
|
if has_keywords(msg_lower, servo_kws):
|
|
hardware["servo"] = True
|
|
detected_in_current = True
|
|
if has_keywords(msg_lower, motor_kws):
|
|
hardware["dc_motor"] = True
|
|
detected_in_current = True
|
|
if has_keywords(msg_lower, button_kws):
|
|
hardware["button"] = True
|
|
detected_in_current = True
|
|
if has_keywords(msg_lower, led_kws):
|
|
hardware["led"] = True
|
|
detected_in_current = True
|
|
if has_keywords(msg_lower, weapon_kws):
|
|
hardware["weapon"] = True
|
|
detected_in_current = True
|
|
if has_keywords(msg_lower, logic_kws):
|
|
# Logic detected: Clear context (don't set any hardware)
|
|
detected_in_current = True
|
|
|
|
# 2. Context Switching: Only look back if NO hardware/logic detected in current message
|
|
# and message is short (likely a follow-up command like "make it spin")
|
|
if not detected_in_current and len(user_message.split()) < 10 and context_messages:
|
|
recent = " ".join([m['content'].lower() for m in context_messages[-2:] if m['role'] == 'user'])
|
|
|
|
if has_keywords(recent, servo_kws): hardware["servo"] = True
|
|
if has_keywords(recent, motor_kws): hardware["dc_motor"] = True
|
|
if has_keywords(recent, button_kws): hardware["button"] = True
|
|
if has_keywords(recent, led_kws): hardware["led"] = True
|
|
if has_keywords(recent, weapon_kws): hardware["weapon"] = True
|
|
|
|
return hardware
|
|
|
|
def get_all_rules(self) -> List[str]:
|
|
"""Get all learned rules as text"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT rule_text FROM code_rules ORDER BY confidence DESC LIMIT 50")
|
|
rows = cursor.fetchall()
|
|
conn.close()
|
|
return [r[0] for r in rows]
|
|
|
|
def filter_rules_by_hardware(self, all_rules, hardware):
|
|
"""Only return rules relevant to detected hardware"""
|
|
|
|
relevant_rules = []
|
|
|
|
# Define rule categories
|
|
servo_kws = ['servo', 'attach', 'setperiodhertz']
|
|
motor_kws = ['l298n', 'in1', 'in2', 'motor driver']
|
|
weapon_kws = ['arming', 'disarm', 'fire', 'combat']
|
|
button_kws = ['button', 'switch', 'debounce', 'digitalread', 'input_pullup']
|
|
|
|
has_specific_context = hardware["servo"] or hardware["dc_motor"] or hardware["weapon"] or hardware["button"]
|
|
|
|
for rule in all_rules:
|
|
rule_lower = rule.lower()
|
|
|
|
is_servo_rule = any(w in rule_lower for w in servo_kws)
|
|
is_motor_rule = any(w in rule_lower for w in motor_kws)
|
|
is_weapon_rule = any(w in rule_lower for w in weapon_kws)
|
|
is_button_rule = any(w in rule_lower for w in button_kws)
|
|
|
|
# Pattern Over-application: Strict filtering
|
|
if has_specific_context:
|
|
if hardware["dc_motor"] and not hardware["servo"] and is_servo_rule: continue
|
|
if hardware["servo"] and not hardware["dc_motor"] and is_motor_rule: continue
|
|
if not hardware["weapon"] and is_weapon_rule: continue
|
|
if not hardware["button"] and is_button_rule: continue
|
|
|
|
# If question is about weapons (logic), EXCLUDE servo rules unless servo explicitly requested
|
|
if hardware["weapon"] and not hardware["servo"] and is_servo_rule: continue
|
|
|
|
else:
|
|
# Generic context: Exclude all specific hardware rules
|
|
if is_servo_rule or is_motor_rule or is_weapon_rule or is_button_rule: continue
|
|
|
|
relevant_rules.append(rule)
|
|
|
|
return relevant_rules
|
|
|
|
def build_enhanced_prompt(self, user_message: str, hardware_detected: str = None, context_messages: List[Dict] = None) -> str:
|
|
"""Build prompt with FILTERED rules"""
|
|
|
|
# Classify hardware
|
|
hardware = self.classify_hardware(user_message, context_messages)
|
|
|
|
# Get ALL rules
|
|
all_rules = self.get_all_rules()
|
|
|
|
# Filter by relevance
|
|
relevant_rules = self.filter_rules_by_hardware(all_rules, hardware)
|
|
|
|
# Build focused prompt
|
|
hardware_context = []
|
|
if hardware["servo"]: hardware_context.append("SERVO CONTROL")
|
|
if hardware["dc_motor"]: hardware_context.append("DC MOTOR CONTROL")
|
|
if hardware["button"]: hardware_context.append("BUTTON INPUTS")
|
|
if hardware["led"]: hardware_context.append("LED STATUS")
|
|
if hardware["weapon"]: hardware_context.append("WEAPON SYSTEM")
|
|
|
|
l298n_rules = ""
|
|
if hardware["dc_motor"]:
|
|
l298n_rules = """
|
|
- L298N WIRING RULES (MANDATORY):
|
|
1. IN1/IN2 = Digital Output (Direction). Use digitalWrite().
|
|
2. ENA = PWM Output (Speed). Use ledcWrite().
|
|
3. To Move: IN1/IN2 must be OPPOSITE (HIGH/LOW).
|
|
4. To Stop: IN1/IN2 both LOW.
|
|
5. DO NOT treat Motors like Servos (No 'position' or 'angle').
|
|
- SAFETY RULES (MANDATORY):
|
|
1. Implement a safety timeout (e.g., 5000ms).
|
|
2. Stop motors if no signal is received within timeout.
|
|
3. Use millis() for non-blocking timing.
|
|
"""
|
|
|
|
weapon_rules = ""
|
|
if hardware.get("weapon"):
|
|
weapon_rules = """
|
|
- COMBAT PROTOCOL (MANDATORY):
|
|
1. LOGIC FOCUS: This is a State Machine request, NOT just servo movement.
|
|
2. STATES: enum State { DISARMED, ARMING, ARMED, FIRING };
|
|
3. TRANSITIONS: DISARMED -> ARMING (2s delay) -> ARMED -> FIRING.
|
|
4. SAFETY: Auto-disarm after 10s idle. Fire only when ARMED.
|
|
5. STRUCTURE: Use switch(currentState) { case ... } for logic.
|
|
6. OUTPUTS: Control relays/LEDs/Motors based on state.
|
|
"""
|
|
|
|
# Anti-bloat rules
|
|
anti_bloat_rules = []
|
|
if not hardware["button"]:
|
|
anti_bloat_rules.append("- NO EXTRA INPUTS: Do NOT add buttons, switches, or digitalRead() unless explicitly requested.")
|
|
if not hardware["servo"]:
|
|
anti_bloat_rules.append("- NO EXTRA SERVOS: Do NOT add Servo objects or attach() unless explicitly requested.")
|
|
if not hardware["dc_motor"]:
|
|
anti_bloat_rules.append("- NO EXTRA MOTORS: Do NOT add motor driver code (L298N) unless explicitly requested.")
|
|
|
|
anti_bloat = "\n".join(anti_bloat_rules)
|
|
|
|
# Modularity rule
|
|
modularity_rule = ""
|
|
if "function" in user_message.lower() or "naming" in user_message.lower() or "modular" in user_message.lower():
|
|
modularity_rule = """
|
|
- CODE STRUCTURE (MANDATORY):
|
|
1. NO MONOLITHIC LOOP: Break code into small, descriptive functions.
|
|
2. NAMING: Use camelCase for functions (e.g., readBatteryVoltage(), updateDisplay()).
|
|
3. loop() must ONLY call these functions, not contain raw logic.
|
|
"""
|
|
|
|
# Status LED rule
|
|
status_led_rule = ""
|
|
if hardware["led"] and ("status" in user_message.lower() or "indicator" in user_message.lower()):
|
|
status_led_rule = """
|
|
- STATUS LED RULES (MANDATORY):
|
|
1. NO BREATHING/FADING: Do not use simple PWM fading loops.
|
|
2. USE STATES: Define enum LEDStatus { OFF, IDLE, ACTIVE, ERROR };
|
|
3. IMPLEMENTATION: Create void setStatusLED(LEDStatus state).
|
|
4. PATTERNS: IDLE=Slow Blink, ACTIVE=Solid On, ERROR=Fast Blink.
|
|
"""
|
|
|
|
prompt = f"""You are generating code for: {', '.join(hardware_context)}
|
|
You are an expert embedded developer.
|
|
TARGET HARDWARE: {hardware_detected}
|
|
ACTIVE MODULES: {', '.join(hardware_context) if hardware_context else "None (Logic Only)"}
|
|
|
|
CRITICAL: Only use code patterns relevant to the hardware mentioned.
|
|
STRICT NEGATIVE CONSTRAINTS (DO NOT IGNORE):
|
|
{anti_bloat}
|
|
|
|
MANDATORY HARDWARE RULES:
|
|
{l298n_rules}
|
|
{weapon_rules}
|
|
{status_led_rule}
|
|
{anti_bloat}
|
|
{modularity_rule}
|
|
|
|
GENERAL GUIDELINES:
|
|
- If DC MOTOR: Use L298N patterns (digitalWrite, ledcWrite)
|
|
- If SERVO: Use ESP32Servo patterns (attach, write)
|
|
- DO NOT mix servo code into motor questions
|
|
- DO NOT mix motor code into servo questions
|
|
|
|
CRITICAL RULES (MUST FOLLOW):
|
|
{chr(10).join(relevant_rules)}
|
|
|
|
USER REQUEST:
|
|
{user_message}
|
|
|
|
Generate code following ALL rules above. Do not add unrequested features.
|
|
FINAL CHECK:
|
|
1. Did you add unrequested buttons? REMOVE THEM.
|
|
2. Did you add unrequested servos? REMOVE THEM.
|
|
3. Generate code ONLY for the hardware requested.
|
|
"""
|
|
|
|
return prompt
|
|
|
|
def is_simple_question(self, message: str) -> bool:
|
|
"""Check if this is a simple question that should use FAST model"""
|
|
message_lower = message.lower()
|
|
|
|
simple_triggers = [
|
|
"what is", "what's", "who is", "who's", "when is",
|
|
"how do i", "can you explain", "tell me about",
|
|
"what are", "where is", "hi", "hello", "hey",
|
|
"good morning", "good evening"
|
|
]
|
|
|
|
# Also check if it's just a question without code keywords
|
|
code_keywords = ["generate", "create", "write", "build", "code", "function"]
|
|
|
|
has_simple_trigger = any(trigger in message_lower for trigger in simple_triggers)
|
|
has_code_keyword = any(keyword in message_lower for keyword in code_keywords)
|
|
|
|
# Simple if: has simple trigger AND no code keywords
|
|
return has_simple_trigger and not has_code_keyword
|
|
|
|
def is_complex(self, message: str) -> bool:
|
|
"""Check if request is too complex and should be broken down"""
|
|
message_lower = message.lower()
|
|
|
|
# Count complexity triggers
|
|
trigger_count = sum(1 for trigger in COMPLEX_TRIGGERS if trigger in message_lower)
|
|
|
|
# Count how many modules mentioned
|
|
module_count = 0
|
|
for module, keywords in MODULE_PATTERNS.items():
|
|
# module is used for key, keywords for values
|
|
if any(kw in message_lower for kw in keywords):
|
|
module_count += 1
|
|
|
|
# Complex if: multiple triggers OR 3+ modules mentioned
|
|
return trigger_count >= 2 or module_count >= 3
|
|
|
|
def extract_modules(self, message: str) -> List[str]:
|
|
"""Extract which modules are needed"""
|
|
message_lower = message.lower()
|
|
needed_modules = []
|
|
|
|
for module, keywords in MODULE_PATTERNS.items():
|
|
# module is used for key, keywords for values
|
|
if any(kw in message_lower for kw in keywords):
|
|
needed_modules.append(module)
|
|
|
|
return needed_modules
|
|
|
|
def build_modular_plan(self, modules: List[str]) -> List[Dict[str, str]]:
|
|
"""Create a build plan from modules"""
|
|
plan = []
|
|
|
|
module_tasks = {
|
|
"ble": "BLE communication setup with phone app control",
|
|
"servo": "Servo motor control for flipper/weapon",
|
|
"motor": "Motor driver setup for movement (L298N)",
|
|
"safety": "Safety timeout and failsafe systems",
|
|
"battery": "Battery voltage monitoring",
|
|
"sensor": "Sensor integration (distance/proximity)"
|
|
}
|
|
|
|
for module in modules:
|
|
if module in module_tasks:
|
|
plan.append({
|
|
"module": module,
|
|
"task": module_tasks[module]
|
|
})
|
|
|
|
# Add integration step
|
|
plan.append({
|
|
"module": "integration",
|
|
"task": "Integrate all modules into complete system"
|
|
})
|
|
|
|
return plan |