diff --git a/CHANGELOG.md b/CHANGELOG.md index 46a87e0..56c8d4e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,38 +1,59 @@ -# Changelog +# BuddAI Development Changelog -## [3.2.0] - 2025-12-29 +## Version 3.2 - Self-Learning & Optimization System -### Added +### šŸ“Š Phase 1: Data Collection -- WebSocket streaming for real-time token-by-token responses -- Multi-user support with session isolation -- Connection pooling for Ollama requests (10 connection pool) -- File upload validation (50MB limit, type checking) -- Zip slip protection for malicious archives -- Filename sanitization -- Type hints throughout codebase -- Enhanced iOS PWA support +Implemented comprehensive data logging to track user interactions and code quality. -### Security +- **Correction Logging**: Added `save_correction()` to store original vs. corrected code with user reasoning. +- **Compilation Logs**: Added `log_compilation_result()` to track hardware-specific compilation success rates. +- **Feedback System**: Enhanced `record_feedback()` to support comments and trigger failure analysis on negative feedback. +- **Database Updates**: Added tables for `corrections`, `compilation_log`, `feedback` (enhanced), and `code_rules`. -- File size limits enforced (50MB) -- Magic number validation for ZIP files -- Path traversal prevention in zip extraction -- Maximum upload file count (10 files) -- Sanitized filenames to prevent path injection +### šŸ”¬ Phase 2: Pattern Extraction -### Performance +Added intelligence to learn from the collected data. -- Connection pooling reduces latency by ~30% -- WebSocket streaming improves perceived response time -- Per-user instance management +- **Smart Learner**: Created `SmartLearner` class to diff code and extract patterns (e.g., `analogWrite` -> `ledcWrite`). +- **Hardware Profiles**: Created `HardwareProfile` class to manage hardware-specific syntax (ESP32 vs Arduino). +- **Rule Storage**: Learned patterns are stored in `code_rules` with confidence scores. +- **Prompt Injection**: `build_enhanced_prompt()` now dynamically injects high-confidence rules into the system prompt. -### Fixed +### āœ… Phase 3: Validation -- Session isolation bug (users can no longer see each other's data) -- Connection leak in Ollama requests -- Memory growth in long-running server instances +Implemented pre-flight checks to ensure code quality before display. -## [3.1.0] - 2025-12-29 +- **Code Validator**: Created `CodeValidator` class to check for: + - ESP32 PWM usage (ledcWrite enforcement). + - Blocking delays in motor code. + - Missing safety timeouts. +- **Auto-Fix**: The system can now automatically patch critical errors (like incorrect PWM calls) before showing code to the user. +- **Hardware Detection**: Automatically detects target hardware (ESP32, Arduino, Pico) from user prompts. -[Previous changelog content...] +### šŸ”„ Phase 4: Feedback Loop + +Established continuous improvement cycles. + +- **Adaptive Learner**: Created `AdaptiveLearner` to analyze session history for implicit corrections ("actually, use X") and preferences. +- **Session Analysis**: Added `/analyze` command to scan the current session for learned lessons. +- **Explicit Teaching**: Added `/teach ` command for manual rule insertion. + +### šŸ“ˆ Metrics & Fine-Tuning + +Added tools to measure and cement progress. + +- **Learning Metrics**: Created `LearningMetrics` to calculate accuracy trends and correction rates over 30 days. +- **Fine-Tuning Prep**: Created `ModelFineTuner` to export corrections into JSONL format for training local LLMs (Qwen). + +### šŸ›  New CLI Commands + +- `/learn`: Extract patterns from stored corrections. +- `/analyze`: Analyze current session for implicit feedback. +- `/correct `: Mark previous response as wrong and save correction. +- `/good`: Mark previous response as correct. +- `/teach `: Explicitly teach a coding rule. +- `/validate`: Run validation checks on the last response. +- `/rules`: Display currently learned rules. +- `/metrics`: Show accuracy and improvement stats. +- `/train`: Export training data for fine-tuning. diff --git a/main.py b/main.py index c1f83e5..0c9a0f6 100644 --- a/main.py +++ b/main.py @@ -13,7 +13,7 @@ import os import json import logging import sqlite3 -from datetime import datetime +from datetime import datetime, timedelta from pathlib import Path import http.client import re # noqa: F401 @@ -24,6 +24,7 @@ import queue import socket import argparse import io +import difflib from urllib.parse import urlparse try: @@ -224,6 +225,408 @@ class ShadowSuggestionEngine: return suggestions +class SmartLearner: + """Extract patterns from corrections""" + + def analyze_corrections(self): + """Find common patterns in your fixes""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + cursor.execute(""" + SELECT original_code, corrected_code, reason + FROM corrections + """) + + corrections = cursor.fetchall() + patterns = [] + + for original, corrected, reason in corrections: + # Extract what changed + diff = self.diff_code(original, corrected) + + # Classify the change + if "analogWrite" in original and "ledcWrite" in corrected: + patterns.append({ + "rule": "ESP32 uses ledcWrite not analogWrite", + "find": "analogWrite", + "replace": "ledcWrite", + "hardware": "ESP32", + "confidence": 1.0 + }) + + if "delay(" in original and "millis()" in corrected: + patterns.append({ + "rule": "Use non-blocking millis() not delay()", + "find": "delay\\(", + "replace": "millis() based timing", + "confidence": 0.9 + }) + + # Store learned rules + self.save_rules(patterns) + return patterns + + def save_rules(self, patterns): + """Save to code_rules table""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS code_rules ( + id INTEGER PRIMARY KEY, + rule_text TEXT, + pattern_find TEXT, + pattern_replace TEXT, + context TEXT, + confidence FLOAT, + learned_from TEXT, + times_applied INTEGER DEFAULT 0 + ) + """) + + for p in patterns: + cursor.execute(""" + INSERT OR REPLACE INTO code_rules + (rule_text, pattern_find, pattern_replace, confidence, learned_from) + VALUES (?, ?, ?, ?, ?) + """, (p['rule'], p['find'], p['replace'], p['confidence'], 'corrections')) + + conn.commit() + conn.close() + + def diff_code(self, original: str, corrected: str) -> str: + """Generate a simple diff""" + return "\n".join(difflib.unified_diff( + original.splitlines(), + corrected.splitlines(), + fromfile='original', + tofile='corrected', + lineterm='' + )) + + +class HardwareProfile: + """Learn hardware-specific patterns""" + + ESP32_PATTERNS = { + "pwm_setup": { + "correct": "ledcSetup(channel, freq, resolution)", + "wrong": ["analogWrite", "pwmWrite"], + "learned_from": "James's corrections" + }, + "serial_baud": { + "preferred": 115200, + "alternatives": [9600, 57600], + "confidence": 1.0 + }, + "safety_timeout": { + "standard": 5000, + "pattern": "millis() - lastTime > TIMEOUT", + "confidence": 1.0 + } + } + + HARDWARE_KEYWORDS = { + "ESP32-C3": ["esp32", "esp32c3", "c3", "esp-32"], + "Arduino Uno": ["uno", "arduino uno", "atmega328p"], + "Raspberry Pi Pico": ["pico", "rp2040"] + } + + def detect_hardware(self, message: str) -> Optional[str]: + msg_lower = message.lower() + for hw, keywords in self.HARDWARE_KEYWORDS.items(): + if any(k in msg_lower for k in keywords): + return hw + return None + + def apply_hardware_rules(self, code: str, hardware: str) -> str: + """Apply known hardware patterns""" + if hardware == "ESP32-C3": + # Apply ESP32-specific fixes + code = self.fix_pwm(code) + code = self.fix_serial(code) + code = self.add_safety(code) + return code + + def fix_pwm(self, code: str) -> str: + for wrong in self.ESP32_PATTERNS["pwm_setup"]["wrong"]: + if wrong in code: + if wrong == "analogWrite": + code = code.replace("analogWrite", "ledcWrite") + return code + + def fix_serial(self, code: str) -> str: + preferred = self.ESP32_PATTERNS["serial_baud"]["preferred"] + return re.sub(r'Serial\.begin\(\s*\d+\s*\)', f'Serial.begin({preferred})', code) + + def add_safety(self, code: str) -> str: + if "motor" in code.lower() and "millis()" not in code: + code += "\n// [BuddAI Safety] Warning: No non-blocking timeout detected. Consider adding safety timeout." + return code + + +class CodeValidator: + """Validate generated code before showing to user""" + + def find_line(self, code: str, substring: str) -> int: + for i, line in enumerate(code.splitlines(), 1): + if substring in line: + return i + return -1 + + def has_safety_timeout(self, code: str) -> bool: + return "millis()" in code and ("-" in code or ">" in code) + + def matches_style(self, code: str) -> bool: + # Placeholder for style matching logic + return True + + def apply_style(self, code: str) -> str: + # Placeholder for style application + return code + + def validate(self, code: str, hardware: str) -> Tuple[bool, List[Dict]]: + """Check code against known rules""" + issues = [] + + # Check 1: ESP32 PWM + if "ESP32" in hardware.upper(): + if "analogWrite" in code: + issues.append({ + "severity": "error", + "line": self.find_line(code, "analogWrite"), + "message": "ESP32 doesn't support analogWrite(). Use ledcWrite()", + "fix": lambda c: c.replace("analogWrite", "ledcWrite") + }) + + # Check 2: Non-blocking code + if "delay(" in code and "motor" in code.lower(): + issues.append({ + "severity": "warning", + "line": self.find_line(code, "delay"), + "message": "Using delay() in motor code blocks safety checks", + "fix": lambda c: c # No auto-fix + }) + + # Check 3: Safety timeout + if "motor" in code.lower() or "servo" in code.lower(): + if not self.has_safety_timeout(code): + issues.append({ + "severity": "warning", + "message": "No safety timeout detected", + "fix": lambda c: c + "\n// [BuddAI Safety] Warning: No safety timeout detected." + }) + + return len([i for i in issues if i['severity'] == 'error']) == 0, issues + + def auto_fix(self, code: str, issues: List[Dict]) -> str: + """Automatically fix known issues""" + fixed_code = code + + for issue in issues: + if 'fix' in issue and issue['severity'] == 'error': + fixed_code = issue['fix'](fixed_code) + + return fixed_code + + +class AdaptiveLearner: + """Learn from every interaction""" + + def learn_from_session(self, session_id: str): + """Analyze what worked/failed in a session""" + print(f"🧠 Adaptive Learning: Analyzing Session {session_id}...") + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + # Get all messages in session + cursor.execute(""" + SELECT id, role, content + FROM messages + WHERE session_id = ? + ORDER BY id ASC + """, (session_id,)) + + messages = cursor.fetchall() + conn.close() + + count = 0 + # Look for correction patterns + for i, (msg_id, role, content) in enumerate(messages): + if role == 'user' and i > 0: + prev_msg = messages[i-1] + prev_role = prev_msg[1] + prev_content = prev_msg[2] + + if prev_role == 'assistant': + # Did James correct the previous response? + if self.is_correction(content, prev_content): + print(f" - Detected correction in msg #{msg_id}") + self.learn_correction(prev_content, content) + count += 1 + + # Did James ask for modification? + if self.is_modification(content): + print(f" - Detected preference in msg #{msg_id}") + self.learn_preference(content) + count += 1 + + if count == 0: + print(" - No obvious corrections found.") + + def is_correction(self, user_msg: str, ai_msg: str) -> bool: + """Detect if user is correcting AI""" + correction_signals = [ + "actually", "no,", "wrong", "should be", "instead of", + "not", "use", "don't use", "change", "fix", "error", "bug" + ] + return any(signal in user_msg.lower() for signal in correction_signals) + + def is_modification(self, user_msg: str) -> bool: + """Detect if user is expressing a preference""" + signals = ["prefer", "i like", "always use", "style", "better", "make it"] + return any(s in user_msg.lower() for s in signals) + + def learn_correction(self, original: str, correction: str): + """Extract the lesson from a correction""" + # Save the rule (Generic capture for now) + rule_text = correction.split('\n')[0][:100] + self.save_rule(rule_text, "context_dependent", correction[:100], confidence=0.5) + + def learn_preference(self, content: str): + """Extract preference""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + cursor.execute(""" + INSERT INTO style_preferences (user_id, category, preference, confidence, extracted_at) + VALUES (?, ?, ?, ?, ?) + """, ("default", "learned_preference", content[:200], 0.6, datetime.now().isoformat())) + conn.commit() + conn.close() + + def save_rule(self, rule_text, find, replace, confidence): + """Save to code_rules table""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + cursor.execute(""" + INSERT INTO code_rules + (rule_text, pattern_find, pattern_replace, confidence, learned_from) + VALUES (?, ?, ?, ?, ?) + """, (rule_text, find, replace, confidence, 'adaptive_session')) + conn.commit() + conn.close() + + +class LearningMetrics: + """Measure BuddAI's improvement over time""" + + def calculate_accuracy(self): + """What % of code is accepted without correction?""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + thirty_days_ago = (datetime.now() - timedelta(days=30)).isoformat() + + cursor.execute(""" + SELECT + COUNT(*) as total_responses, + COUNT(CASE WHEN f.positive = 1 THEN 1 END) as positive_feedback, + COUNT(CASE WHEN c.id IS NOT NULL THEN 1 END) as corrected + FROM messages m + LEFT JOIN feedback f ON m.id = f.message_id + LEFT JOIN corrections c ON m.content LIKE '%' || c.original_code || '%' + WHERE m.role = 'assistant' + AND m.timestamp > ? + """, (thirty_days_ago,)) + + total, positive, corrected = cursor.fetchone() + conn.close() + + accuracy = (positive / total) * 100 if total and total > 0 else 0 + correction_rate = (corrected / total) * 100 if total and total > 0 else 0 + + return { + "accuracy": accuracy, + "correction_rate": correction_rate, + "improvement": self.calculate_trend() + } + + def calculate_trend(self): + """Is BuddAI getting better over time?""" + # Compare last 7 days vs previous 7 days + recent = self.get_accuracy_for_period(7) + previous = self.get_accuracy_for_period(7, offset=7) + + improvement = recent - previous + return f"+{improvement:.1f}%" if improvement > 0 else f"{improvement:.1f}%" + + def get_accuracy_for_period(self, days: int, offset: int = 0) -> float: + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + start_dt = (datetime.now() - timedelta(days=days + offset)).isoformat() + end_dt = (datetime.now() - timedelta(days=offset)).isoformat() + + cursor.execute(""" + SELECT + COUNT(*) as total, + COUNT(CASE WHEN f.positive = 1 THEN 1 END) as positive + FROM messages m + LEFT JOIN feedback f ON m.id = f.message_id + WHERE m.role = 'assistant' + AND m.timestamp BETWEEN ? AND ? + """, (start_dt, end_dt)) + + row = cursor.fetchone() + conn.close() + + if not row: + return 0.0 + + total, positive = row + return (positive / total) * 100 if total and total > 0 else 0.0 + + +class ModelFineTuner: + """Fine-tune local model on YOUR corrections""" + + def prepare_training_data(self): + """Convert corrections to training format""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + cursor.execute(""" + SELECT original_code, corrected_code, reason + FROM corrections + """) + + training_data = [] + for original, corrected, reason in cursor.fetchall(): + training_data.append({ + "prompt": f"Generate code for: {reason}", + "completion": corrected, + "negative_example": original + }) + + conn.close() + + # Save as JSONL for fine-tuning + output_path = DATA_DIR / 'training_data.jsonl' + with open(output_path, 'w', encoding='utf-8') as f: + for item in training_data: + f.write(json.dumps(item) + '\n') + return f"Exported {len(training_data)} examples to {output_path}" + + def fine_tune_model(self): + """Fine-tune Qwen on your corrections""" + # This requires: + # 1. Export training data + # 2. Use Ollama modelfile or external training + # 3. Create custom model: qwen2.5-coder-james:3b + pass + + class BuddAI: """Executive with task breakdown""" @@ -306,6 +709,13 @@ class BuddAI: self.server_mode = server_mode self.context_messages = [] self.shadow_engine = ShadowSuggestionEngine(DB_PATH, self.user_id) + self.learner = SmartLearner() + self.hardware_profile = HardwareProfile() + self.current_hardware = "ESP32-C3" + self.validator = CodeValidator() + self.adaptive_learner = AdaptiveLearner() + self.metrics = LearningMetrics() + self.fine_tuner = ModelFineTuner() print("BuddAI Executive v3.1 - Modular Builder") print("=" * 50) @@ -394,6 +804,46 @@ class BuddAI: ) """) + try: + cursor.execute("ALTER TABLE feedback ADD COLUMN comment TEXT") + except sqlite3.OperationalError: + pass + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS corrections ( + id INTEGER PRIMARY KEY, + timestamp TEXT, + original_code TEXT, + corrected_code TEXT, + reason TEXT, + context TEXT + ) + """) + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS compilation_log ( + id INTEGER PRIMARY KEY, + timestamp TEXT, + code TEXT, + success BOOLEAN, + errors TEXT, + hardware TEXT + ) + """) + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS code_rules ( + id INTEGER PRIMARY KEY, + rule_text TEXT, + pattern_find TEXT, + pattern_replace TEXT, + context TEXT, + confidence FLOAT, + learned_from TEXT, + times_applied INTEGER DEFAULT 0 + ) + """) + conn.commit() conn.close() @@ -593,6 +1043,140 @@ class BuddAI: conn.close() print(f"\nāœ… Style Signature Updated:\n{summary}\n") + def get_recent_context(self, limit: int = 5) -> str: + """Get recent chat context as a string""" + return json.dumps(self.context_messages[-limit:]) + + def save_correction(self, original_code: str, corrected_code: str, reason: str): + """Store when James fixes BuddAI's code""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS corrections ( + id INTEGER PRIMARY KEY, + timestamp TEXT, + original_code TEXT, + corrected_code TEXT, + reason TEXT, + context TEXT + ) + """) + + cursor.execute(""" + INSERT INTO corrections + (timestamp, original_code, corrected_code, reason, context) + VALUES (?, ?, ?, ?, ?) + """, ( + datetime.now().isoformat(), + original_code, + corrected_code, + reason, + self.get_recent_context() + )) + + conn.commit() + conn.close() + + def detect_hardware(self, message: str) -> str: + """Wrapper to detect hardware from message or return current default""" + hw = self.hardware_profile.detect_hardware(message) + return hw if hw else self.current_hardware + + def get_applicable_rules(self, user_message: str) -> List[Dict]: + """Get rules relevant to the user message""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + # Fetch rules with reasonable confidence + cursor.execute("SELECT rule_text, confidence FROM code_rules WHERE confidence > 0.6 ORDER BY confidence DESC") + rows = cursor.fetchall() + conn.close() + return [{"rule_text": r[0], "confidence": r[1]} for r in rows] + + def get_style_summary(self) -> str: + """Get summary of learned style preferences""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + cursor.execute("SELECT category, preference FROM style_preferences WHERE confidence > 0.6") + rows = cursor.fetchall() + conn.close() + if not rows: + return "Standard coding style." + return ", ".join([f"{r[0]}: {r[1]}" for r in rows]) + + def build_enhanced_prompt(self, user_message: str) -> str: + """Inject learned rules into prompt""" + + # Get relevant rules + rules = self.get_applicable_rules(user_message) + + # Build enhanced system prompt + enhanced_prompt = f"""You are BuddAI, James's coding partner. + +CRITICAL RULES (learned from James's corrections): +""" + + for rule in rules: + confidence = "āœ“āœ“āœ“" if rule['confidence'] > 0.9 else "āœ“āœ“" if rule['confidence'] > 0.7 else "āœ“" + enhanced_prompt += f"{confidence} {rule['rule_text']}\n" + + enhanced_prompt += f""" + +HARDWARE CONTEXT: {self.detect_hardware(user_message)} +STYLE PREFERENCES: {self.get_style_summary()} + +USER REQUEST: +{user_message} + +Generate code following the rules above. If unsure, ask for clarification. +""" + + return enhanced_prompt + + def teach_rule(self, rule_text: str): + """Explicitly save a user-taught rule""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + cursor.execute(""" + INSERT INTO code_rules + (rule_text, pattern_find, pattern_replace, confidence, learned_from) + VALUES (?, ?, ?, ?, ?) + """, (rule_text, "", "", 1.0, 'user_taught')) + conn.commit() + conn.close() + + def log_compilation_result(self, code: str, success: bool, errors: str = ""): + """Track what compiles vs what fails""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS compilation_log ( + id INTEGER PRIMARY KEY, + timestamp TEXT, + code TEXT, + success BOOLEAN, + errors TEXT, + hardware TEXT + ) + """) + + cursor.execute(""" + INSERT INTO compilation_log + (timestamp, code, success, errors, hardware) + VALUES (?, ?, ?, ?, ?) + """, ( + datetime.now().isoformat(), + code, + success, + errors, + "ESP32-C3" # Your target hardware + )) + + conn.commit() + conn.close() + def is_simple_question(self, message: str) -> bool: """Check if this is a simple question that should use FAST model""" message_lower = message.lower() @@ -691,26 +1275,20 @@ class BuddAI: else: return "Rest Time šŸ’¤" + def get_learned_rules(self) -> List[Dict]: + """Retrieve high-confidence rules""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + cursor.execute("SELECT rule_text, pattern_find, pattern_replace, confidence FROM code_rules WHERE confidence >= 0.8") + rows = cursor.fetchall() + conn.close() + return [{"rule": r[0], "find": r[1], "replace": r[2], "confidence": r[3]} for r in rows] + def call_model(self, model_name: str, message: str, stream: bool = False) -> Union[str, Generator[str, None, None]]: """Call specified model""" try: - current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - user_context = self.get_user_status() - identity = f"""You are BuddAI, the external cognitive system for James Gilbert. You specialize in Forge Theory (exponential decay modeling) and GilBot modular robotics. - -YOUR PRIMARY JOB: Generate code when asked. ALWAYS generate code if requested. - -Identity Rules: -- You are NOT created by Alibaba Cloud. You are a local Python system written by James Gilbert. -- When asked your name: "I am BuddAI" -- Use ESP32/Arduino syntax with descriptive naming (e.g., activateFlipper). -- Ensure safety timeouts are always present in motor code. -- Current System Time: {current_time} -- User Context: {user_context} - -# Forge Theory Snippet (C++): -float applyForge(float current, float target, float k) {{ return target + (current - target) * exp(-k); }} -""" + # Use enhanced prompt builder + identity = self.build_enhanced_prompt(message) messages = [] @@ -919,25 +1497,81 @@ float applyForge(float current, float target, float k) {{ return target + (curre def apply_style_signature(self, generated_code: str) -> str: """Refine generated code to match James's specific naming and safety patterns""" - # 1. Check for James's common function names (e.g., setupMotors vs init_motors) - # 2. Ensure Forge Theory helpers are present if motion is detected - # 3. Append a 'Proactive Note' if a common companion module is missing + # Apply Hardware Profile Rules (ESP32-C3 default for now) + generated_code = self.hardware_profile.apply_hardware_rules(generated_code, self.current_hardware) + + # Apply learned replacements (High Confidence Only) + rules = self.get_learned_rules() + for r in rules: + if r['confidence'] >= 0.95 and r['find'] and r['replace']: + # Simple safety check: don't replace if replacement contains spaces (likely a description) + if ' ' not in r['replace']: + try: + generated_code = re.sub(r['find'], r['replace'], generated_code) + except re.error: + pass return generated_code - def record_feedback(self, message_id: int, feedback: bool) -> None: + def record_feedback(self, message_id: int, feedback: bool, comment: str = "") -> Optional[str]: """Learn from user feedback.""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(""" - INSERT INTO feedback (message_id, positive, timestamp) - VALUES (?, ?, ?) - """, (message_id, feedback, datetime.now().isoformat())) + INSERT INTO feedback (message_id, positive, comment, timestamp) + VALUES (?, ?, ?, ?) + """, (message_id, feedback, comment, datetime.now().isoformat())) conn.commit() conn.close() # Adjust confidence scores self.update_style_confidence(message_id, feedback) + + if not feedback: + self.analyze_failure(message_id) + return self.regenerate_response(message_id, comment) + return None + + def regenerate_response(self, message_id: int, comment: str = "") -> str: + """Regenerate a response, optionally considering feedback comment""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + cursor.execute("SELECT session_id, id FROM messages WHERE id = ?", (message_id,)) + row = cursor.fetchone() + if not row: + conn.close() + return "Error: Message not found." + + session_id, current_id = row + + cursor.execute( + "SELECT content FROM messages WHERE session_id = ? AND id < ? AND role = 'user' ORDER BY id DESC LIMIT 1", + (session_id, current_id) + ) + user_row = cursor.fetchone() + conn.close() + + if user_row: + prompt = user_row[0] + if comment: + prompt += f"\n\n[Feedback: {comment}]" + + print(f"šŸ”„ Regenerating: {prompt[:50]}...") + return self.chat(prompt) + return "Error: Original prompt not found." + + def analyze_failure(self, message_id: int) -> None: + """Analyze why a message received negative feedback""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + cursor.execute("SELECT content FROM messages WHERE id = ?", (message_id,)) + row = cursor.fetchone() + conn.close() + + if row: + print(f"\nāš ļø Negative Feedback on Message #{message_id}") + print(f" Content: {row[0][:100]}...") def update_style_confidence(self, message_id: int, positive: bool) -> None: """Adjust confidence of style preferences based on feedback.""" @@ -972,6 +1606,11 @@ float applyForge(float current, float target, float k) {{ return target + (curre def chat_stream(self, user_message: str, force_model: Optional[str] = None, forge_mode: str = "2") -> Generator[str, None, None]: """Streaming version of chat""" + # Detect Hardware Context + detected_hw = self.hardware_profile.detect_hardware(user_message) + if detected_hw: + self.current_hardware = detected_hw + style_context = self.retrieve_style_context(user_message) if style_context: self.context_messages.append({"role": "system", "content": style_context}) @@ -1014,9 +1653,19 @@ float applyForge(float current, float target, float k) {{ return target + (curre self.last_generated_id = msg_id self.context_messages.append({"id": msg_id, "role": "assistant", "content": full_response, "timestamp": datetime.now().isoformat()}) + def extract_code(self, text: str) -> List[str]: + """Extract code blocks from markdown""" + return re.findall(r'```(?:\w+)?\n(.*?)```', text, re.DOTALL) + # --- Main Chat Method --- def chat(self, user_message: str, force_model: Optional[str] = None, forge_mode: str = "2") -> str: """Main chat with smart routing and shadow suggestions""" + # Detect Hardware Context + detected_hw = self.hardware_profile.detect_hardware(user_message) + if detected_hw: + self.current_hardware = detected_hw + print(f"šŸ”§ Target Hardware Detected: {self.current_hardware}") + style_context = self.retrieve_style_context(user_message) if style_context: self.context_messages.append({"role": "system", "content": style_context}) @@ -1039,6 +1688,24 @@ float applyForge(float current, float target, float k) {{ return target + (curre # Apply Style Guard response = self.apply_style_signature(response) + # Extract code blocks + code_blocks = self.extract_code(response) + + # Validate each code block + for code in code_blocks: + valid, issues = self.validator.validate(code, self.current_hardware) + + if not valid: + # Auto-fix critical issues + fixed_code = self.validator.auto_fix(code, issues) + response = response.replace(code, fixed_code) + + # Append explanation + response += "\n\nāš ļø **Auto-corrected:**\n" + for issue in issues: + if issue['severity'] == 'error': + response += f"- {issue['message']}\n" + # Generate Suggestion Bar suggestions = self.shadow_engine.get_all_suggestions(user_message, response) if suggestions: @@ -1223,6 +1890,28 @@ float applyForge(float current, float target, float k) {{ return target + (curre return session_id + def create_backup(self) -> Tuple[bool, str]: + """Create a safe backup of the database""" + if not DB_PATH.exists(): + return False, "Database file not found." + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + backup_dir = DATA_DIR / "backups" + backup_dir.mkdir(exist_ok=True) + backup_path = backup_dir / f"conversations_{timestamp}.db" + + try: + # Use SQLite backup API for consistency + src = sqlite3.connect(DB_PATH) + dst = sqlite3.connect(backup_path) + with dst: + src.backup(dst) + dst.close() + src.close() + return True, str(backup_path) + except Exception as e: + return False, str(e) + def run(self) -> None: """Main loop""" try: @@ -1251,7 +1940,17 @@ float applyForge(float current, float target, float k) {{ return target + (curre print("/balanced - Use balanced model") print("/index - Index local repositories") print("/scan - Scan style signature (V3.0)") + print("/learn - Extract patterns from corrections") + print("/analyze - Analyze session for implicit feedback") + print("/correct - Mark previous response wrong") + print("/good - Mark previous response correct") + print("/teach - Explicitly teach a rule") + print("/validate - Re-validate last response") + print("/rules - Show learned rules") + print("/metrics - Show improvement stats") + print("/train - Export corrections for fine-tuning") print("/save - Export chat to Markdown") + print("/backup - Backup database") print("/help - This message") print("exit - End session\n") continue @@ -1265,6 +1964,110 @@ float applyForge(float current, float target, float k) {{ return target + (curre elif cmd == '/scan': self.scan_style_signature() continue + elif cmd == '/learn': + print("🧠 Analyzing corrections for patterns...") + patterns = self.learner.analyze_corrections() + if patterns: + print(f"āœ… Learned {len(patterns)} new rules:") + for p in patterns: + print(f" - {p['rule']}") + else: + print("No new patterns found.") + continue + elif cmd == '/analyze': + self.adaptive_learner.learn_from_session(self.session_id) + continue + elif cmd.startswith('/correct'): + reason = user_input[8:].strip() + last_response = "" + # Find last assistant message + for msg in reversed(self.context_messages): + if msg['role'] == 'assistant': + last_response = msg['content'] + break + self.save_correction(last_response, "", reason) + print("āœ… Correction saved. I'll try to remember that.") + continue + elif cmd == '/good': + if self.last_generated_id: + self.record_feedback(self.last_generated_id, True) + print("āœ… Feedback recorded: Positive") + else: + print("āŒ No recent message to rate.") + continue + elif cmd.startswith('/teach'): + rule = user_input[7:].strip() + if rule: + self.teach_rule(rule) + print(f"āœ… Learned rule: {rule}") + else: + print("Usage: /teach ") + continue + elif cmd == '/validate': + last_response = "" + for msg in reversed(self.context_messages): + if msg['role'] == 'assistant': + last_response = msg['content'] + break + + if not last_response: + print("āŒ No recent code to validate.") + continue + + code_blocks = self.extract_code(last_response) + if not code_blocks: + print("āŒ No code blocks found in last response.") + continue + + print("\nšŸ” Validating last response...") + all_valid = True + for i, code in enumerate(code_blocks, 1): + valid, issues = self.validator.validate(code, self.current_hardware) + if not valid: + all_valid = False + print(f"\nBlock {i} Issues:") + for issue in issues: + icon = "āŒ" if issue['severity'] == 'error' else "āš ļø" + print(f" {icon} Line {issue.get('line', '?')}: {issue['message']}") + else: + print(f"āœ… Block {i} is valid.") + + if all_valid: + print("\n✨ All code blocks look good!") + continue + elif cmd == '/rules': + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + cursor.execute("SELECT rule_text, confidence, learned_from FROM code_rules ORDER BY confidence DESC") + rows = cursor.fetchall() + conn.close() + + if not rows: + print("🤷 No rules learned yet.") + else: + print(f"\n🧠 Learned Rules ({len(rows)}):") + for rule, conf, source in rows: + print(f" - [{conf:.1f}] {rule} ({source})") + continue + elif cmd == '/metrics': + stats = self.metrics.calculate_accuracy() + print("\nšŸ“Š Learning Metrics (Last 30 Days):") + print(f" Accuracy: {stats['accuracy']:.1f}%") + print(f" Correction Rate: {stats['correction_rate']:.1f}%") + print(f" Trend (7d): {stats['improvement']}") + print("") + continue + elif cmd == '/train': + result = self.fine_tuner.prepare_training_data() + print(f"āœ… {result}") + continue + elif cmd == '/backup': + success, msg = self.create_backup() + if success: + print(f"āœ… Database backed up to: {msg}") + else: + print(f"āŒ Backup failed: {msg}") + continue elif cmd.startswith('/save'): if 'json' in user_input.lower(): print(self.export_session_to_json()) @@ -1313,6 +2116,7 @@ if SERVER_AVAILABLE: class FeedbackRequest(BaseModel): message_id: int positive: bool + comment: str = "" class ResetGpuRequest(BaseModel): pass @@ -1583,7 +2387,9 @@ if SERVER_AVAILABLE: @app.post("/api/feedback") async def feedback_endpoint(req: FeedbackRequest, user_id: str = Header("default")): server_buddai = buddai_manager.get_instance(user_id) - server_buddai.record_feedback(req.message_id, req.positive) + new_response = server_buddai.record_feedback(req.message_id, req.positive, req.comment) + if new_response: + return {"status": "regenerated", "response": new_response, "message_id": server_buddai.last_generated_id} return {"status": "success"} @app.post("/api/system/reset-gpu") @@ -1592,6 +2398,11 @@ if SERVER_AVAILABLE: result = server_buddai.reset_gpu() return {"message": result} + @app.get("/api/system/metrics") + async def metrics_endpoint(user_id: str = Header("default")): + server_buddai = buddai_manager.get_instance(user_id) + return server_buddai.metrics.calculate_accuracy() + @app.get("/api/system/status") async def system_status_endpoint(): mem_percent = 0 @@ -1602,6 +2413,20 @@ if SERVER_AVAILABLE: cpu_percent = psutil.cpu_percent(interval=None) return {"memory": mem_percent, "cpu": cpu_percent} + @app.get("/api/system/backup") + async def backup_endpoint(user_id: str = Header("default")): + server_buddai = buddai_manager.get_instance(user_id) + success, path_or_err = server_buddai.create_backup() + + if success: + return FileResponse( + path=path_or_err, + filename=Path(path_or_err).name, + media_type='application/x-sqlite3' + ) + else: + return JSONResponse(status_code=500, content={"message": f"Backup failed: {path_or_err}"}) + @app.get("/api/utils/qrcode") async def qrcode_endpoint(url: str): if not qrcode: