docs: Add Remote Access Implementation Log detailing troubleshooting steps for Ngrok and Tailscale integration

This commit is contained in:
JamesTheGiblet 2026-01-01 17:08:45 +00:00
parent 10e57a83a2
commit 4e42a06618
11 changed files with 9930 additions and 2580 deletions

View file

@ -6,6 +6,7 @@
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![Status: PRODUCTION](https://img.shields.io/badge/Status-PRODUCTION-green.svg)](https://github.com/JamesTheGiblet/BuddAI) [![Status: PRODUCTION](https://img.shields.io/badge/Status-PRODUCTION-green.svg)](https://github.com/JamesTheGiblet/BuddAI)
[![Version: v3.2](https://img.shields.io/badge/Version-v3.2-blue.svg)](https://github.com/JamesTheGiblet/BuddAI/releases) [![Version: v3.2](https://img.shields.io/badge/Version-v3.2-blue.svg)](https://github.com/JamesTheGiblet/BuddAI/releases)
[![Version: v3.8](https://img.shields.io/badge/Version-v3.8-blue.svg)](https://github.com/JamesTheGiblet/BuddAI/releases)
[![Tests: 24/24](https://img.shields.io/badge/Tests-24%2F24%20Passing-brightgreen.svg)](https://github.com/JamesTheGiblet/BuddAI/actions) [![Tests: 24/24](https://img.shields.io/badge/Tests-24%2F24%20Passing-brightgreen.svg)](https://github.com/JamesTheGiblet/BuddAI/actions)
--- ---
@ -40,29 +41,47 @@
- Added connection pooling - Added connection pooling
- **Milestone 6 Complete:** Production Hardening ✓ - **Milestone 6 Complete:** Production Hardening ✓
**Day 4+ (January 2026):**
- Implemented **Code Validator** with Auto-Fix engine
- Added **Smart Learner** to extract rules from corrections
- Integrated **Forge Theory** with interactive tuning
- Validated with 14-hour test suite (90% accuracy)
- **Milestone 7 Complete:** Self-Learning & Validation ✓
--- ---
### Result: BuddAI v3.2 - Hardened Modular Builder ### Result: BuddAI v3.2 - Hardened Modular Builder
### Result: BuddAI v3.8 - Self-Learning Modular Builder
✅ Remembers conversations across sessions ✅ Remembers conversations across sessions
✅ Routes to appropriate models automatically ✅ Routes to appropriate models automatically
✅ Breaks complex tasks into manageable modules ✅ Breaks complex tasks into manageable modules
**Indexes and searches your 115+ repositories** **Indexes and searches your 115+ repositories**
**Learns your coding style patterns** **Learns your coding style patterns**
**Proactively suggests missing components** **Proactively suggests missing components**
**Auto-corrects code errors before you see them**
**Beautiful web interface with live workspace** **Beautiful web interface with live workspace**
**Schedule-aware responses** **Schedule-aware responses**
**Interactive Forge Theory tuning**
✅ Generates complete, working code ✅ Generates complete, working code
✅ Works on slow hardware (8GB RAM) ✅ Works on slow hardware (8GB RAM)
✅ **Built in <2 weeks with $0 spent** ✅ **Built in <2 weeks with $0 spent**
**v3.2 New Capabilities:** **v3.2 New Capabilities:**
**v3.8 New Capabilities:**
- ✅ **WebSocket streaming** (real-time token-by-token responses) - ✅ **WebSocket streaming** (real-time token-by-token responses)
- ✅ **Multi-user support** (session isolation per user) - ✅ **Multi-user support** (session isolation per user)
- ✅ **Connection pooling** (faster Ollama communication) - ✅ **Connection pooling** (faster Ollama communication)
- ✅ **Upload security** (file size limits, type validation, zip slip protection) - ✅ **Upload security** (file size limits, type validation, zip slip protection)
- ✅ **Type hints** (improved code quality and IDE support) - ✅ **Type hints** (improved code quality and IDE support)
- ✅ **Auto-Fix Engine** (detects and fixes safety timeouts, state machines)
- ✅ **Smart Learner** (extracts rules from your corrections)
- ✅ **Validation Suite** (checks PWM, ADC, pins against hardware rules)
- ✅ **Forge Theory Mode** (Aggressive/Balanced/Graceful physics)
- ✅ **Learning Metrics** (tracks accuracy improvement over time)
--- ---
@ -96,6 +115,8 @@ BuddAI is a **personal IP AI exocortex** - an external cognitive system that ext
### What It Actually Does (v3.2) ### What It Actually Does (v3.2)
### What It Actually Does (v3.8)
**Simple Questions (5-10 seconds):** **Simple Questions (5-10 seconds):**
``` ```
@ -187,6 +208,8 @@ BuddAI: 🎯 COMPLEX REQUEST DETECTED!
### 🎯 Current Capabilities (v3.2) ### 🎯 Current Capabilities (v3.2)
### 🎯 Current Capabilities (v3.8)
**Core Features:** **Core Features:**
- ✅ Generate complete robot controllers - ✅ Generate complete robot controllers
@ -197,17 +220,22 @@ BuddAI: 🎯 COMPLEX REQUEST DETECTED!
- ✅ Work on slow hardware (8GB RAM) - ✅ Work on slow hardware (8GB RAM)
**v3.2 New Capabilities:** **v3.2 New Capabilities:**
**v3.8 New Capabilities:**
- ✅ **Search indexed repositories with natural language** - ✅ **Search indexed repositories with natural language**
- ✅ **Upload and index code via web interface** - ✅ **Upload and index code via web interface**
- ✅ **Style signature scanning and application** - ✅ **Style signature scanning and application**
- ✅ **Shadow suggestion engine** (proactive module suggestions) - ✅ **Shadow suggestion engine** (proactive module suggestions)
- ✅ **Schedule awareness** (knows your work/build cycles) - ✅ **Schedule awareness** (knows your work/build cycles)
- ✅ **Auto-Fix Engine** (corrects errors automatically)
- ✅ **Smart Learner** (extracts patterns from corrections)
- ✅ **Validation Report** (90% accuracy across 10-question suite)
- ✅ **Forge Theory mode selector** (Aggressive/Balanced/Graceful) - ✅ **Forge Theory mode selector** (Aggressive/Balanced/Graceful)
- ✅ **Session management** (rename/delete in web UI) - ✅ **Session management** (rename/delete in web UI)
- ✅ **Live code workspace** sidebar with syntax highlighting - ✅ **Live code workspace** sidebar with syntax highlighting
- ✅ **Dark/Light theme** toggle - ✅ **Dark/Light theme** toggle
- ✅ **Actionable suggestion pills** (click to apply) - ✅ **Actionable suggestion pills** (click to apply)
- ✅ **Learning Metrics** (track improvement)
- ✅ **Real-time status** indicators - ✅ **Real-time status** indicators
### 🔄 In Progress ### 🔄 In Progress
@ -223,6 +251,15 @@ BuddAI: 🎯 COMPLEX REQUEST DETECTED!
- Comprehensive integration tests - Comprehensive integration tests
**Timeline:** Completed **Timeline:** Completed
**Milestone 7: Self-Learning & Validation**
**Status:** ✅ COMPLETE (v3.8)
- Code Validator with Auto-Fix
- Smart Learner (correction analysis)
- Forge Theory integration
- 14-hour validation suite passed
**Timeline:** Completed (Jan 2026)
### 🔮 Future Vision ### 🔮 Future Vision
@ -276,6 +313,8 @@ BuddAI: 🎯 COMPLEX REQUEST DETECTED!
### Architecture (v3.2) ### Architecture (v3.2)
### Architecture (v3.8)
``` ```
┌─────────────────────────────────────────┐ ┌─────────────────────────────────────────┐
│ You (James) │ │ You (James) │

3443
archive/buddai_v3.8.py Normal file

File diff suppressed because it is too large Load diff

1902
buddai_executive.py Normal file

File diff suppressed because it is too large Load diff

600
buddai_logic.py Normal file
View file

@ -0,0 +1,600 @@
#!/usr/bin/env python3
import sys, os, json, logging, sqlite3, datetime, pathlib, http.client, re, typing, zipfile, shutil, queue, socket, argparse, io, difflib
from pathlib import Path
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Tuple, Union, Generator
class CodeValidator:
"""Validate generated code before showing to user"""
def find_line(self, code: str, substring: str) -> int:
for i, line in enumerate(code.splitlines(), 1):
if substring in line:
return i
return -1
def has_safety_timeout(self, code: str) -> bool:
# Simple heuristic: needs millis, subtraction, and a comparison to a value/constant
# We want to avoid matching debounce logic (usually < 100ms)
if "millis()" not in code: return False
# Check for constants like SAFETY_TIMEOUT, MOTOR_TIMEOUT
if re.search(r'>\s*[A-Z_]*TIMEOUT', code):
return True
# Check for state machine timeout (Combat Protocol)
if "DISARM" in code and "millis" in code and ">" in code:
return True
# Check for numeric literals > 500 (Debounce is usually 50)
comparisons = re.findall(r'>\s*(\d+)', code)
return any(int(val) > 500 for val in comparisons)
def matches_style(self, code: str) -> bool:
# Placeholder for style matching logic
return True
def apply_style(self, code: str) -> str:
# Placeholder for style application
return code
def refactor_loop_to_function(self, code: str) -> str:
"""Extract loop body into runSystemLogic()"""
loop_match = re.search(r'void\s+loop\s*\(\s*\)\s*\{', code)
if not loop_match: return code
start_idx = loop_match.end()
brace_count = 1
loop_body_end = -1
for i, char in enumerate(code[start_idx:], start=start_idx):
if char == '{': brace_count += 1
elif char == '}': brace_count -= 1
if brace_count == 0:
loop_body_end = i
break
if loop_body_end == -1: return code
body = code[start_idx:loop_body_end]
new_code = code[:start_idx] + "\n runSystemLogic();\n" + code[loop_body_end:]
new_code += "\n\nvoid runSystemLogic() {" + body + "}\n"
return new_code
def validate(self, code: str, hardware: str, user_message: str = "") -> Tuple[bool, List[Dict]]:
"""Check code against known rules"""
issues = []
# Check 1: ESP32 PWM
if "ESP32" in hardware.upper():
if "analogWrite" in code:
issues.append({
"severity": "error",
"line": self.find_line(code, "analogWrite"),
"message": "ESP32 doesn't support analogWrite(). Use ledcWrite()",
"fix": lambda c: c.replace("analogWrite", "ledcWrite")
})
# Check 2: Non-blocking code
if "delay(" in code and "motor" in code.lower():
issues.append({
"severity": "warning",
"line": self.find_line(code, "delay"),
"message": "Using delay() in motor code blocks safety checks",
"fix": lambda c: c # No auto-fix
})
# Check 3: Safety timeout
if ("motor" in code.lower() or "servo" in code.lower()):
if not self.has_safety_timeout(code):
# Context-aware stop logic
is_servo = "Servo" in code and "L298N" not in code
stop_logic = " // STOP MOTORS\n ledcWrite(0, 0);\n ledcWrite(1, 0);"
if is_servo:
stop_logic = " // STOP SERVO\n // Implement safe position (e.g. myServo.write(90));"
issues.append({
"severity": "error",
"message": "Critical: No safety timeout detected (must be > 500ms).",
"fix": lambda c, sl=stop_logic: "#define SAFETY_TIMEOUT 5000\nunsigned long lastCommand = 0;\n" + \
re.sub(r'(void\s+loop\s*\(\s*\)\s*\{)', \
rf'\1\n // [AUTO-FIX] Safety Timeout\n if (millis() - lastCommand > SAFETY_TIMEOUT) {{\n{sl}\n }}\n', c)
})
# Check 4: L298N PWM Pin Misuse
pwm_pins = re.findall(r'ledcAttachPin\s*\(\s*(\w+)\s*,', code)
for pin in pwm_pins:
# Check if digitalWrite is used on this pin
if re.search(r'digitalWrite\s*\(\s*' + re.escape(pin) + r'\s*,', code):
issues.append({
"severity": "error",
"line": self.find_line(code, f"digitalWrite({pin}"),
"message": f"Conflict: PWM pin '{pin}' used with digitalWrite(). Use ledcWrite() for speed control.",
"fix": lambda c, p=pin: re.sub(r'digitalWrite\s*\(\s*' + re.escape(p) + r'\s*,\s*[^)]+\);?', f'// [Fixed] Removed conflicting digitalWrite on PWM pin {p}', c)
})
# Check 5: Broken Debounce Logic (Type Mismatch)
# Example: if (buttonState != lastDebounceTime)
bad_debounce = re.search(r'if\s*\(\s*\w+\s*[!=]=\s*\w*DebounceTime\s*\)', code)
if bad_debounce:
issues.append({
"severity": "error",
"line": self.find_line(code, bad_debounce.group(0)),
"message": "Type Mismatch: Comparing button state (int) with time (long).",
"fix": lambda c: c.replace(bad_debounce.group(0), "if ((millis() - lastDebounceTime) > debounceDelay)")
})
# Check 6: Safety Timeout Value
timeout_match = re.search(r'#define\s+SAFETY_TIMEOUT\s+(\d+)', code)
if timeout_match and int(timeout_match.group(1)) > 5000:
issues.append({
"severity": "error",
"line": self.find_line(code, timeout_match.group(0)),
"message": f"Safety timeout {timeout_match.group(1)}ms is too long (Max: 5000ms).",
"fix": lambda c: re.sub(r'(#define\s+SAFETY_TIMEOUT\s+)\d+', r'\g<1>5000', c)
})
# Check 7: Broken Safety Timer Logic (Static Init)
bad_static = re.search(r'static\s+unsigned\s+long\s+(\w+)\s*=\s*millis\(\);', code)
if bad_static:
issues.append({
"severity": "error",
"line": self.find_line(code, bad_static.group(0)),
"message": "Static timer initialized with millis() prevents reset. Initialize to 0.",
"fix": lambda c: c.replace(bad_static.group(0), f"static unsigned long {bad_static.group(1)} = 0;")
})
# Check 8: Incomplete Motor Logic (L298N Validation)
# If user explicitly asks for L298N or DC Motor, OR asks for 'motor' without 'servo'
is_l298n_request = "l298n" in user_message.lower() or "dc motor" in user_message.lower() or ("motor" in user_message.lower() and "servo" not in user_message.lower())
if is_l298n_request:
# 1. Check for Direction Pins (IN1/IN2)
if not re.search(r'(?:#define|const\s+int)\s+\w*(?:IN1|IN2|DIR)\w*', code, re.IGNORECASE):
issues.append({
"severity": "error",
"message": "Missing L298N Direction Pins (IN1/IN2).",
"fix": lambda c: "// [AUTO-FIX] L298N Definitions\n#define IN1 18\n#define IN2 19\n" + c
})
# 2. Check for PWM Pin (ENA)
if not re.search(r'(?:#define|const\s+int)\s+\w*(?:ENA|ENB|PWM)\w*', code, re.IGNORECASE):
issues.append({
"severity": "error",
"message": "Missing L298N PWM Pin (ENA).",
"fix": lambda c: "#define ENA 21 // [AUTO-FIX] Missing PWM Pin\n" + c
})
# 3. Check for Direction Control (digitalWrite)
if "digitalWrite" not in code:
issues.append({
"severity": "error",
"message": "L298N requires digitalWrite() for direction control.",
"fix": lambda c: re.sub(r'(void\s+loop\s*\(\s*\)\s*\{)', r'\1\n // [AUTO-FIX] Set Direction\n digitalWrite(IN1, HIGH);\n digitalWrite(IN2, LOW);\n', c)
})
# Check 9: Unnecessary Wire.h
wire_include = re.search(r'#include\s+[<"]Wire\.h[>"]', code)
if wire_include:
# Check if Wire is actually used (excluding the include itself)
rest_of_code = code.replace(wire_include.group(0), "")
if not re.search(r'\bWire\b', rest_of_code):
issues.append({
"severity": "error",
"line": self.find_line(code, wire_include.group(0)),
"message": "Unnecessary #include <Wire.h> detected.",
"fix": lambda c: re.sub(r'#include\s+[<"]Wire\.h[>"]', '// [Auto-Fix] Removed unnecessary Wire.h', c)
})
# Check 10: High-Frequency Serial Logging
if ("Serial.print" in code or "Serial.write" in code) and \
("motor" in code.lower() or "servo" in code.lower()):
# Check for throttling pattern (simple heuristic for timer variables)
if not re.search(r'(print|log|debug|serial)\s*Timer', code, re.IGNORECASE) and \
not re.search(r'last\s*(Print|Log|Debug)', code, re.IGNORECASE):
issues.append({
"severity": "warning",
"line": self.find_line(code, "Serial.print"),
"message": "Serial logging in motor loops causes jitter. Ensure it's throttled (e.g. every 100ms).",
"fix": lambda c: c + "\n// [Performance] Warning: Serial.print() inside loops can interrupt motor timing."
})
# Check 11: Feature Bloat (Unrequested Button)
if user_message:
msg_lower = user_message.lower()
# If user didn't ask for inputs/buttons
if not any(w in msg_lower for w in ['button', 'switch', 'input', 'trigger']):
# Pattern 1: Variable assignment (int btn = digitalRead(...))
for match in re.finditer(r'(?:int|bool|byte)\s+(\w*(?:button|btn|switch)\w*)\s*=\s*digitalRead\s*\([^;]+;', code, re.IGNORECASE):
issues.append({
"severity": "error",
"line": self.find_line(code, match.group(0)),
"message": f"Feature Bloat: Unrequested button code detected ('{match.group(1)}').",
"fix": lambda c, m=match.group(0): c.replace(m, "")
})
# Pattern 2: Direct usage in conditions (if (digitalRead(BUTTON_PIN)...))
for match in re.finditer(r'digitalRead\s*\(\s*(\w*(?:BUTTON|BTN|SWITCH)\w*)\s*\)', code, re.IGNORECASE):
issues.append({
"severity": "error",
"line": self.find_line(code, match.group(0)),
"message": f"Feature Bloat: Unrequested button check detected ('{match.group(1)}').",
"fix": lambda c, m=match.group(0): c.replace(m, "0")
})
# Pattern 3: pinMode(..., INPUT)
for match in re.finditer(r'pinMode\s*\(\s*\w+\s*,\s*INPUT(?:_PULLUP)?\s*\);', code):
issues.append({
"severity": "error",
"line": self.find_line(code, match.group(0)),
"message": "Feature Bloat: Unrequested input pin configuration.",
"fix": lambda c, m=match.group(0): c.replace(m, "")
})
# Pattern 4: Unused button variable initialization (int btn = LOW;)
for match in re.finditer(r'(?:int|bool|byte)\s+(\w*(?:button|btn|switch)\w*)\s*=\s*(?:LOW|HIGH|0|1|false|true)\s*;', code, re.IGNORECASE):
issues.append({
"severity": "error",
"line": self.find_line(code, match.group(0)),
"message": f"Feature Bloat: Unused button variable '{match.group(1)}'.",
"fix": lambda c, m=match.group(0): c.replace(m, "")
})
# Check 14: State Machine for Weapons (Combat Protocol)
if "weapon" in user_message.lower() or "combat" in user_message.lower() or "state machine" in user_message.lower():
if "enum" not in code and "bool isArmed" not in code:
issues.append({
"severity": "error",
"message": "Combat code requires a State Machine (enum State or bool isArmed).",
"fix": lambda c: c.replace("void setup", "\n// [AUTO-FIX] State Machine\nenum State { DISARMED, ARMING, ARMED, FIRING };\nState currentState = DISARMED;\nunsigned long stateTimer = 0;\n\nvoid setup") if "void setup" in c else "// [AUTO-FIX] State Machine\nenum State { DISARMED, ARMING, ARMED, FIRING };\nState currentState = DISARMED;\n" + c
})
if "Serial.read" not in code and "Serial.available" not in code:
issues.append({
"severity": "error",
"message": "Missing Serial Command handling (e.g., 'A' to Arm).",
"fix": lambda c: c.replace("void loop() {", "void loop() {\n if (Serial.available()) {\n char cmd = Serial.read();\n // Handle commands\n }\n")
})
# Check 15: Function Naming Conventions (camelCase)
# Exclude standard Arduino functions
func_defs = re.finditer(r'\b(void|int|bool|float|double|String|char|long|unsigned(?:\s+long)?)\s+([a-zA-Z0-9_]+)\s*\(', code)
for match in func_defs:
func_name = match.group(2)
if func_name in ['setup', 'loop', 'main']: continue
# Check if camelCase (starts with lowercase, no underscores unless specific style)
if not re.match(r'^[a-z][a-zA-Z0-9]*$', func_name):
# Check if it's snake_case or PascalCase
suggestion = func_name
if '_' in func_name: # snake_case -> camelCase
components = func_name.split('_')
suggestion = components[0].lower() + ''.join(x.title() for x in components[1:])
elif func_name[0].isupper(): # PascalCase -> camelCase
suggestion = func_name[0].lower() + func_name[1:]
issues.append({
"severity": "warning",
"line": self.find_line(code, match.group(0)),
"message": f"Style: Function '{func_name}' should be camelCase (e.g., '{suggestion}').",
"fix": lambda c, old=func_name, new=suggestion: c.replace(old, new)
})
# Check 16: Monolithic Code Structure
if "function" in user_message.lower() or "naming" in user_message.lower() or "modular" in user_message.lower():
has_custom_funcs = False
for match in re.finditer(r'\b(void|int|bool|float|double|String|char|long|unsigned(?:\s+long)?)\s+([a-zA-Z0-9_]+)\s*\(', code):
if match.group(2) not in ['setup', 'loop', 'main']:
has_custom_funcs = True
break
if not has_custom_funcs:
issues.append({
"severity": "error",
"message": "Structure Violation: Request asked for functions but code is monolithic.",
"fix": lambda c: c.replace("void loop() {", "void loop() {\n runSystemLogic();\n}\n\nvoid runSystemLogic() {") + "\n}"
})
# Check 17: Loop Length (Modularity)
if "function" in user_message.lower() or "naming" in user_message.lower() or "modular" in user_message.lower():
loop_match = re.search(r'void\s+loop\s*\(\s*\)\s*\{', code)
if loop_match:
start_idx = loop_match.end()
brace_count = 1
loop_body = ""
for char in code[start_idx:]:
if char == '{': brace_count += 1
elif char == '}': brace_count -= 1
if brace_count == 0:
break
loop_body += char
# Count significant lines
lines = [line.strip() for line in loop_body.split('\n')]
significant_lines = [l for l in lines if l and not l.startswith('//') and not l.startswith('/*') and l != '']
if len(significant_lines) >= 10:
issues.append({
"severity": "error",
"message": f"Modularity Violation: loop() has {len(significant_lines)} lines (limit 10). Move logic to functions.",
"fix": lambda c: self.refactor_loop_to_function(c)
})
# Check 18: ADC Resolution (ESP32)
if "ESP32" in hardware.upper():
adc_res_match = re.search(r'#define\s+(\w*ADC\w*RES\w*)\s+(\d+)', code, re.IGNORECASE)
if adc_res_match:
val = int(adc_res_match.group(2))
if val not in [4095, 4096]:
issues.append({
"severity": "error",
"line": self.find_line(code, adc_res_match.group(0)),
"message": f"Hardware Mismatch: ESP32 ADC is 12-bit (4095), not {val}.",
"fix": lambda c, old=adc_res_match.group(0), name=adc_res_match.group(1): c.replace(old, f"#define {name} 4095")
})
# Check 20: Hardcoded 10-bit ADC math
# Matches / 1023, / 1023.0, / 1024.0 (avoiding / 1024 int for bytes)
for match in re.finditer(r'/\s*(1023(?:\.0?)?f?|1024(?:\.0)f?)', code):
issues.append({
"severity": "error",
"line": self.find_line(code, match.group(0)),
"message": "Hardware Mismatch: ESP32 ADC is 12-bit. Use 4095.0, not 1023/1024.",
"fix": lambda c, m=match.group(0): c.replace(m, "/ 4095.0")
})
# Check 21: Status LED Pattern
if "status" in user_message.lower() and ("led" in user_message.lower() or "indicator" in user_message.lower()):
# Detect breathing logic (incrementing duty cycle in loop)
breathing_match = re.search(r'(?:dutyCycle|brightness)\s*(\+=|\+\+|\-=|\-\-)', code)
if breathing_match:
issues.append({
"severity": "error",
"line": self.find_line(code, breathing_match.group(0)),
"message": "Wrong Pattern: Status indicators should use Blink Patterns (States), not Breathing/Fading.",
"fix": lambda c: c + "\n// [Fix Required] Implement setStatusLED(LEDStatus state) instead of fading."
})
# Check for missing Enum
if not re.search(r'enum\s+(?:StatusState|LEDStatus)\s*\{', code):
issues.append({
"severity": "error",
"message": "Missing Status Enum: Status LEDs require a state machine (enum LEDStatus {OFF, IDLE, ACTIVE, ERROR}).",
"fix": lambda c: c.replace("void setup", "\n// [AUTO-FIX] Status Enum\nenum LEDStatus { OFF, IDLE, ACTIVE, ERROR };\nLEDStatus currentStatus = IDLE;\nunsigned long lastBlink = 0;\n\nvoid setup") if "void setup" in c else "// [AUTO-FIX] Status Enum\nenum LEDStatus { OFF, IDLE, ACTIVE, ERROR };\nLEDStatus currentStatus = IDLE;\nunsigned long lastBlink = 0;\n" + c
})
# Check 19: Unnecessary Debouncing (Analog/Battery)
if "battery" in user_message.lower() or "voltage" in user_message.lower() or "analog" in user_message.lower():
if "button" not in user_message.lower():
debounce_match = re.search(r'(?:debounce|lastDebounceTime)', code, re.IGNORECASE)
if debounce_match:
issues.append({
"severity": "error",
"line": self.find_line(code, debounce_match.group(0)),
"message": "Logic Error: Debouncing detected in analog/battery code. Analog sensors don't need debouncing.",
"fix": lambda c: re.sub(r'.*debounce.*', '// [Fixed] Removed unnecessary debounce logic', c, flags=re.IGNORECASE)
})
# Check 12: Undefined Pin Constants
pin_vars = set(re.findall(r'(?:digitalRead|digitalWrite|pinMode|ledcAttachPin)\s*\(\s*([a-zA-Z_]\w+)', code))
for var in pin_vars:
if var in ['LED_BUILTIN', 'HIGH', 'LOW', 'INPUT', 'OUTPUT', 'INPUT_PULLUP', 'true', 'false']:
continue
# Check if defined
is_defined = re.search(r'#define\s+' + re.escape(var) + r'\b', code) or \
re.search(r'\b(?:const\s+)?(?:int|byte|uint8_t|short)\s+' + re.escape(var) + r'\s*=', code)
if not is_defined:
issues.append({
"severity": "error",
"message": f"Undefined variable '{var}' used in pin operation.",
"fix": lambda c, v=var: f"#define {v} 2 // [Auto-Fix] Defined missing pin\n" + c
})
# Check 22: Misused Debouncing (Animation Timing)
if "brightness" in code or "fade" in code:
misused_debounce = re.search(r'if\s*\(\s*\(?\s*millis\(\)\s*-\s*\w+\s*\)?\s*>\s*(\w*DEBOUNCE\w*)\s*\)\s*\{', code, re.IGNORECASE)
if misused_debounce:
var_name = misused_debounce.group(1)
# Check if the block actually modifies brightness (simple heuristic lookahead)
start_index = misused_debounce.end()
snippet = code[start_index:start_index+200]
if any(x in snippet for x in ['brightness', 'fade', 'dutyCycle', 'ledcWrite']):
issues.append({
"severity": "error",
"line": self.find_line(code, var_name),
"message": f"Semantic Error: Using {var_name} for animation/fading. Use UPDATE_INTERVAL or FADE_SPEED.",
"fix": lambda c, v=var_name: c.replace(v, "FADE_SPEED" if v.isupper() else "fadeSpeed")
})
# Check 24: Unused Variables in Setup
setup_match = re.search(r'void\s+setup\s*\(\s*\)\s*\{', code)
if setup_match:
start_idx = setup_match.end()
brace_count = 1
setup_body = ""
for char in code[start_idx:]:
if char == '{': brace_count += 1
elif char == '}': brace_count -= 1
if brace_count == 0: break
setup_body += char
clean_body = re.sub(r'//.*', '', setup_body)
clean_body = re.sub(r'/\*.*?\*/', '', clean_body, flags=re.DOTALL)
local_vars = re.finditer(r'\b((?:static\s+)?(?:const\s+)?(?:int|float|bool|char|String|long|double|byte|uint8_t|unsigned(?:\s+long)?))\s+([a-zA-Z_]\w*)\s*(?:=|;)', clean_body)
for match in local_vars:
var_type = match.group(1)
var_name = match.group(2)
if len(re.findall(r'\b' + re.escape(var_name) + r'\b', clean_body)) == 1:
issues.append({
"severity": "warning",
"line": self.find_line(code, f"{var_type} {var_name}"),
"message": f"Unused variable '{var_name}' in setup().",
"fix": lambda c, v=var_name, t=var_type: re.sub(r'\b' + re.escape(t) + r'\s+' + re.escape(v) + r'[^;]*;\s*', '', c)
})
# Check 25: Missing Serial.begin
if re.search(r'Serial\.(?:print|write|println|printf)', code) and not re.search(r'Serial\.begin\s*\(', code):
issues.append({
"severity": "error",
"message": "Missing Serial.begin() initialization.",
"fix": lambda c: re.sub(r'void\s+setup\s*\(\s*\)\s*\{', r'void setup() {\n Serial.begin(115200);', c, count=1)
})
# Check 26: Missing Wire.begin
if re.search(r'Wire\.(?!h\b|begin\b)', code) and not re.search(r'Wire\.begin\s*\(', code):
issues.append({
"severity": "error",
"message": "Missing Wire.begin() initialization for I2C.",
"fix": lambda c: re.sub(r'void\s+setup\s*\(\s*\)\s*\{', r'void setup() {\n Wire.begin();', c, count=1)
})
return len([i for i in issues if i['severity'] == 'error']) == 0, issues
def auto_fix(self, code: str, issues: List[Dict]) -> str:
"""Automatically fix known issues"""
fixed_code = code
for issue in issues:
if 'fix' in issue and issue['severity'] == 'error':
fixed_code = issue['fix'](fixed_code)
return fixed_code
class HardwareProfile:
"""Learn hardware-specific patterns"""
ESP32_PATTERNS = {
"pwm_setup": {
"correct": "ledcSetup(channel, freq, resolution)",
"wrong": ["analogWrite", "pwmWrite"],
"learned_from": "James's corrections"
},
"serial_baud": {
"preferred": 115200,
"alternatives": [9600, 57600],
"confidence": 1.0
},
"safety_timeout": {
"standard": 5000,
"pattern": "millis() - lastTime > TIMEOUT",
"confidence": 1.0
}
}
HARDWARE_KEYWORDS = {
"ESP32-C3": ["esp32", "esp32c3", "c3", "esp-32"],
"Arduino Uno": ["uno", "arduino uno", "atmega328p"],
"Raspberry Pi Pico": ["pico", "rp2040"]
}
def detect_hardware(self, message: str) -> Optional[str]:
msg_lower = message.lower()
for hw, keywords in self.HARDWARE_KEYWORDS.items():
if any(k in msg_lower for k in keywords):
return hw
return None
def apply_hardware_rules(self, code: str, hardware: str) -> str:
"""Apply known hardware patterns"""
if hardware == "ESP32-C3":
# Apply ESP32-specific fixes
code = self.fix_pwm(code)
code = self.fix_serial(code)
code = self.add_safety(code)
return code
def fix_pwm(self, code: str) -> str:
for wrong in self.ESP32_PATTERNS["pwm_setup"]["wrong"]:
if wrong in code:
if wrong == "analogWrite":
code = code.replace("analogWrite", "ledcWrite")
return code
def fix_serial(self, code: str) -> str:
preferred = self.ESP32_PATTERNS["serial_baud"]["preferred"]
return re.sub(r'Serial\.begin\(\s*\d+\s*\)', f'Serial.begin({preferred})', code)
def add_safety(self, code: str) -> str:
if "motor" in code.lower() and "millis()" not in code:
code += "\n// [BuddAI Safety] Warning: No non-blocking timeout detected. Consider adding safety timeout."
return code
class LearningMetrics:
"""Measure BuddAI's improvement over time"""
def calculate_accuracy(self):
"""What % of code is accepted without correction?"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
thirty_days_ago = (datetime.now() - timedelta(days=30)).isoformat()
cursor.execute("""
SELECT
COUNT(*) as total_responses,
COUNT(CASE WHEN f.positive = 1 THEN 1 END) as positive_feedback,
COUNT(CASE WHEN c.id IS NOT NULL THEN 1 END) as corrected
FROM messages m
LEFT JOIN feedback f ON m.id = f.message_id
LEFT JOIN corrections c ON m.content LIKE '%' || c.original_code || '%'
WHERE m.role = 'assistant'
AND m.timestamp > ?
""", (thirty_days_ago,))
total, positive, corrected = cursor.fetchone()
conn.close()
accuracy = (positive / total) * 100 if total and total > 0 else 0
correction_rate = (corrected / total) * 100 if total and total > 0 else 0
return {
"accuracy": accuracy,
"correction_rate": correction_rate,
"improvement": self.calculate_trend()
}
def calculate_trend(self):
"""Is BuddAI getting better over time?"""
# Compare last 7 days vs previous 7 days
recent = self.get_accuracy_for_period(7)
previous = self.get_accuracy_for_period(7, offset=7)
improvement = recent - previous
return f"+{improvement:.1f}%" if improvement > 0 else f"{improvement:.1f}%"
def get_accuracy_for_period(self, days: int, offset: int = 0) -> float:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
start_dt = (datetime.now() - timedelta(days=days + offset)).isoformat()
end_dt = (datetime.now() - timedelta(days=offset)).isoformat()
cursor.execute("""
SELECT
COUNT(*) as total,
COUNT(CASE WHEN f.positive = 1 THEN 1 END) as positive
FROM messages m
LEFT JOIN feedback f ON m.id = f.message_id
WHERE m.role = 'assistant'
AND m.timestamp BETWEEN ? AND ?
""", (start_dt, end_dt))
row = cursor.fetchone()
conn.close()
if not row:
return 0.0
total, positive = row
return (positive / total) * 100 if total and total > 0 else 0.0

355
buddai_memory.py Normal file
View file

@ -0,0 +1,355 @@
#!/usr/bin/env python3
import sys, os, json, logging, sqlite3, datetime, pathlib, http.client, re, typing, zipfile, shutil, queue, socket, argparse, io, difflib
from pathlib import Path
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Tuple, Union, Generator
from buddai_shared import DB_PATH, MODULE_PATTERNS
class ShadowSuggestionEngine:
"""Proactively suggests modules/settings based on user/project history."""
def __init__(self, db_path: Path, user_id: str = "default"):
self.db_path = db_path
self.user_id = user_id
def lookup_recent_module_usage(self, module: str, limit: int = 5) -> List[Tuple[str, str, str]]:
"""Look up recent usage patterns for a module from repo_index."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
"""
SELECT file_path, content, last_modified FROM repo_index
WHERE (function_name LIKE ? OR file_path LIKE ?) AND user_id = ?
ORDER BY last_modified DESC LIMIT ?
""",
(f"%{module}%", f"%{module}%", self.user_id, limit)
)
results = cursor.fetchall()
conn.close()
return results
def suggest_for_module(self, module: str) -> Optional[str]:
"""Return a proactive suggestion string for a module if pattern detected."""
history = self.lookup_recent_module_usage(module)
if not history:
return None
# Example: For 'motor', look for L298N and PWM frequency
l298n_count = 0
pwm_freqs = []
for _, content, _ in history:
if "L298N" in content or "l298n" in content:
l298n_count += 1
pwm_matches = re.findall(r'PWM_FREQ\s*=\s*(\d+)', content)
pwm_freqs.extend([int(f) for f in pwm_matches])
# Also look for explicit frequency in analogWrite or ledcSetup
freq_matches = re.findall(r'(?:ledcSetup|analogWrite)\s*\([^,]+,\s*[^,]+,\s*(\d+)\)', content)
pwm_freqs.extend([int(f) for f in freq_matches if f.isdigit()])
if l298n_count >= 2:
freq = max(set(pwm_freqs), key=pwm_freqs.count) if pwm_freqs else 500
return f"I see you usually use the L298N with a {freq}Hz PWM frequency on the ESP32-C3. Should I prep that module?"
return None
def get_proactive_suggestion(self, user_input: str) -> Optional[str]:
"""
V3.0 Proactive Hook:
1. Identify "Concept" (e.g., 'flipper')
2. Query repo_index for James's most frequent companion modules
3. If 'flipper' often appears with 'safety_timeout', suggest it.
"""
# 1. Identify Concepts
input_lower = user_input.lower()
detected_modules = []
for module, keywords in MODULE_PATTERNS.items():
if any(kw in input_lower for kw in keywords):
detected_modules.append(module)
if not detected_modules:
return None
# 2. Query repo_index for correlations
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
suggestions = []
for module in detected_modules:
# Find files containing this module (simple heuristic)
cursor.execute("SELECT content FROM repo_index WHERE content LIKE ? AND user_id = ? LIMIT 10", (f"%{module}%", self.user_id))
rows = cursor.fetchall()
if not rows: continue
# Check for companion modules
companions = {}
for (content,) in rows:
content_lower = content.lower()
for other_mod, other_kws in MODULE_PATTERNS.items():
if other_mod != module and other_mod not in detected_modules:
if any(kw in content_lower for kw in other_kws):
companions[other_mod] = companions.get(other_mod, 0) + 1
# 3. Suggest if frequent (>50% correlation in sample)
for other_mod, count in companions.items():
if count >= len(rows) * 0.5:
suggestions.append(f"I noticed '{module}' often appears with '{other_mod}' in your repos. Want to include that?")
conn.close()
return " ".join(list(set(suggestions))) if suggestions else None
def get_all_suggestions(self, user_input: str, generated_code: str) -> List[str]:
"""Aggregate all proactive suggestions into a list."""
suggestions = []
# 1. Companion Modules
companion = self.get_proactive_suggestion(user_input)
if companion:
suggestions.append(companion)
# 2. Module Settings
input_lower = user_input.lower()
for module, keywords in MODULE_PATTERNS.items():
if any(kw in input_lower for kw in keywords):
s = self.suggest_for_module(module)
if s:
suggestions.append(s)
# 3. Forge Theory Check
if ("motor" in input_lower or "servo" in input_lower) and "applyForge" not in generated_code:
suggestions.append("Apply Forge Theory smoothing to movement?")
# 4. Safety Check (L298N)
if "L298N" in generated_code and "safety" not in generated_code.lower():
suggestions.append("Drive system lacks safety timeout (GilBot_V2 uses 5s failsafe). Add that?")
return suggestions
class AdaptiveLearner:
"""Learn from every interaction"""
def learn_from_session(self, session_id: str):
"""Analyze what worked/failed in a session"""
print(f"🧠 Adaptive Learning: Analyzing Session {session_id}...")
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Get all messages in session
cursor.execute("""
SELECT id, role, content
FROM messages
WHERE session_id = ?
ORDER BY id ASC
""", (session_id,))
messages = cursor.fetchall()
conn.close()
count = 0
# Look for correction patterns
for i, (msg_id, role, content) in enumerate(messages):
if role == 'user' and i > 0:
prev_msg = messages[i-1]
prev_role = prev_msg[1]
prev_content = prev_msg[2]
if prev_role == 'assistant':
# Did James correct the previous response?
if self.is_correction(content, prev_content):
print(f" - Detected correction in msg #{msg_id}")
self.learn_correction(prev_content, content)
count += 1
# Did James ask for modification?
if self.is_modification(content):
print(f" - Detected preference in msg #{msg_id}")
self.learn_preference(content)
count += 1
if count == 0:
print(" - No obvious corrections found.")
def is_correction(self, user_msg: str, ai_msg: str) -> bool:
"""Detect if user is correcting AI"""
correction_signals = [
"actually", "no,", "wrong", "should be", "instead of",
"not", "use", "don't use", "change", "fix", "error", "bug"
]
return any(signal in user_msg.lower() for signal in correction_signals)
def is_modification(self, user_msg: str) -> bool:
"""Detect if user is expressing a preference"""
signals = ["prefer", "i like", "always use", "style", "better", "make it"]
return any(s in user_msg.lower() for s in signals)
def learn_correction(self, original: str, correction: str):
"""Extract the lesson from a correction"""
# Save the rule (Generic capture for now)
rule_text = correction.split('\n')[0][:100]
self.save_rule(rule_text, "context_dependent", correction[:100], confidence=0.5)
def learn_preference(self, content: str):
"""Extract preference"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO style_preferences (user_id, category, preference, confidence, extracted_at)
VALUES (?, ?, ?, ?, ?)
""", ("default", "learned_preference", content[:200], 0.6, datetime.now().isoformat()))
conn.commit()
conn.close()
def save_rule(self, rule_text, find, replace, confidence):
"""Save to code_rules table"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO code_rules
(rule_text, pattern_find, pattern_replace, confidence, learned_from)
VALUES (?, ?, ?, ?, ?)
""", (rule_text, find, replace, confidence, 'adaptive_session'))
conn.commit()
conn.close()
class SmartLearner:
"""Extract patterns from corrections"""
def analyze_corrections(self, ai_interface=None):
"""Find common patterns in your fixes"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Ensure processed column exists
try:
cursor.execute("ALTER TABLE corrections ADD COLUMN processed BOOLEAN DEFAULT 0")
except sqlite3.OperationalError:
pass
# Check pending count
cursor.execute("SELECT COUNT(*) FROM corrections WHERE processed IS NOT 1")
pending_count = cursor.fetchone()[0]
if pending_count == 0:
conn.close()
return []
# Process in small batches
cursor.execute("""
SELECT id, original_code, corrected_code, reason
FROM corrections
WHERE processed IS NOT 1
LIMIT 5
""")
corrections = cursor.fetchall()
print(f" Processing {len(corrections)} of {pending_count} pending corrections...")
patterns = []
for row_id, original, corrected, reason in corrections:
# Strategy 1: Diff based (if corrected code exists)
if corrected and original:
# Extract what changed
diff = self.diff_code(original, corrected)
# Classify the change
if "analogWrite" in original and "ledcWrite" in corrected:
patterns.append({
"rule": "ESP32 uses ledcWrite not analogWrite",
"find": "analogWrite",
"replace": "ledcWrite",
"hardware": "ESP32",
"confidence": 1.0
})
if "delay(" in original and "millis()" in corrected:
patterns.append({
"rule": "Use non-blocking millis() not delay()",
"find": "delay\\(",
"replace": "millis() based timing",
"confidence": 0.9
})
# Strategy 2: Reason based (LLM extraction)
if reason and ai_interface:
print(f" - Analyzing #{row_id}...", end="\r")
# Use LLM to extract rule from text reason
prompt = f"""Analyze this correction text and extract specific technical coding rules.
Ignore conversational filler.
Correction Text:
"{reason}"
Return ONLY a list of rules in this format:
Rule: <concise technical rule>
"""
try:
response = ai_interface.call_model("fast", prompt, system_task=True)
for line in response.splitlines():
clean_line = line.strip().replace("**", "").replace("__", "")
rule_text = None
if "rule:" in clean_line.lower():
parts = clean_line.split(":", 1)
rule_text = parts[1].strip() if len(parts) > 1 else clean_line
elif re.match(r'^[\d-]+\.', clean_line) or clean_line.startswith("- "):
rule_text = re.sub(r'^[\d-]+\.?\s*', '', clean_line).strip()
if rule_text and len(rule_text) > 10 and rule_text != reason:
patterns.append({
"rule": rule_text,
"find": "",
"replace": "",
"confidence": 0.85
})
except Exception:
pass
# Mark as processed immediately
cursor.execute("UPDATE corrections SET processed = 1 WHERE id = ?", (row_id,))
conn.commit()
print(" - Batch complete. ")
conn.close()
# Store learned rules
if patterns:
self.save_rules(patterns)
return patterns
def save_rules(self, patterns):
"""Save to code_rules table"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS code_rules (
id INTEGER PRIMARY KEY,
rule_text TEXT,
pattern_find TEXT,
pattern_replace TEXT,
context TEXT,
confidence FLOAT,
learned_from TEXT,
times_applied INTEGER DEFAULT 0
)
""")
for p in patterns:
cursor.execute("""
INSERT OR REPLACE INTO code_rules
(rule_text, pattern_find, pattern_replace, confidence, learned_from)
VALUES (?, ?, ?, ?, ?)
""", (p['rule'], p['find'], p['replace'], p['confidence'], 'corrections'))
conn.commit()
conn.close()
def diff_code(self, original: str, corrected: str) -> str:
"""Generate a simple diff"""
return "\n".join(difflib.unified_diff(
original.splitlines(),
corrected.splitlines(),
fromfile='original',
tofile='corrected',
lineterm=''
))

651
buddai_server.py Normal file
View file

@ -0,0 +1,651 @@
#!/usr/bin/env python3
import sys, os, json, logging, sqlite3, datetime, pathlib, http.client, re, typing, zipfile, shutil, queue, socket, argparse, io, difflib
from pathlib import Path
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Tuple, Union, Generator
from fastapi import FastAPI
import uvicorn
from buddai_shared import SERVER_AVAILABLE, DATA_DIR, DB_PATH, MODELS, OLLAMA_HOST, OLLAMA_PORT
from buddai_executive import BuddAI
# (Removed duplicate definitions of check_ollama, is_port_available, and main to resolve indentation and duplication errors)
from fastapi.middleware.cors import CORSMiddleware
from fastapi import File, UploadFile, Header, WebSocket, WebSocketDisconnect, Request, Response
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from urllib.parse import urlparse
try:
import psutil
except ImportError:
psutil = None
try:
import qrcode
except ImportError:
qrcode = None
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB
MAX_UPLOAD_FILES = 20
ALLOWED_TYPES = [
"application/zip", "application/x-zip-compressed",
"text/x-python", "text/plain", "application/octet-stream",
"text/x-c++src", "text/x-c++hdr", "text/javascript",
"text/html", "text/css"
]
app = FastAPI(title="BuddAI API", version="3.2")
# Allow React frontend to communicate
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
class ChatRequest(BaseModel):
message: str
model: Optional[str] = None
forge_mode: Optional[str] = "2"
class SessionLoadRequest(BaseModel):
session_id: str
class SessionRenameRequest(BaseModel):
session_id: str
title: str
class SessionDeleteRequest(BaseModel):
session_id: str
class FeedbackRequest(BaseModel):
message_id: int
positive: bool
comment: str = ""
class ResetGpuRequest(BaseModel):
pass
# Multi-user support
def check_ollama() -> bool:
try:
conn = http.client.HTTPConnection(OLLAMA_HOST, OLLAMA_PORT, timeout=5)
conn.request("GET", "/api/tags")
response = conn.getresponse()
if response.status == 200:
data = json.loads(response.read().decode('utf-8'))
conn.close()
installed_models = [m['name'] for m in data.get('models', [])]
missing = [m for m in MODELS.values() if m not in installed_models]
if missing:
print(f"⚠️ WARNING: Missing models in Ollama: {', '.join(missing)}")
print(f" Run in host terminal: ollama pull {' && ollama pull '.join(missing)}")
return True
return False
except Exception:
return False
def is_port_available(port: int, host: str = "0.0.0.0") -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind((host, port))
return True
except socket.error:
return False
def main() -> None:
if not check_ollama():
print(f"❌ Ollama not running at {OLLAMA_HOST}:{OLLAMA_PORT}. Ensure it is running and accessible.")
sys.exit(1)
parser = argparse.ArgumentParser(description="BuddAI Executive")
parser.add_argument("--server", action="store_true", help="Run in server mode")
parser.add_argument("--port", type=int, default=8000, help="Port for server mode")
parser.add_argument("--host", type=str, default="0.0.0.0", help="Host IP address")
parser.add_argument("--public-url", type=str, default="", help="Public URL for QR codes")
args = parser.parse_args()
if args.server:
if SERVER_AVAILABLE:
port = args.port
if not is_port_available(port, args.host):
print(f"⚠️ Port {port} is in use.")
for i in range(1, 11):
if is_port_available(port + i, args.host):
port += i
print(f"🔄 Switching to available port: {port}")
break
else:
print(f"❌ Could not find available port in range {args.port}-{args.port+10}")
sys.exit(1)
# Silence health check logs from frontend polling
class EndpointFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
msg = record.getMessage()
return "/api/system/status" not in msg and '"GET / HTTP/1.1" 200' not in msg
logging.getLogger("uvicorn.access").addFilter(EndpointFilter())
print(f"🚀 Starting BuddAI API Server on port {port}...")
if args.public_url:
print(f"🔗 Public Access: {args.public_url}")
app.state.public_url = args.public_url
uvicorn.run(app, host=args.host, port=port)
else:
print("❌ Server dependencies missing. Install: pip install fastapi uvicorn aiofiles python-multipart")
else:
buddai = BuddAI()
buddai.run()
class BuddAIManager:
def __init__(self):
self.instances: Dict[str, BuddAI] = {}
def get_instance(self, user_id: str) -> BuddAI:
if user_id not in self.instances:
self.instances[user_id] = BuddAI(user_id=user_id, server_mode=True)
return self.instances[user_id]
buddai_manager = BuddAIManager()
# Serve Frontend
frontend_path = Path(__file__).parent / "frontend"
frontend_path.mkdir(exist_ok=True)
app.mount("/web", StaticFiles(directory=frontend_path, html=True), name="web")
@app.get("/", response_class=HTMLResponse)
async def root(request: Request):
server_buddai = buddai_manager.get_instance("default")
status = server_buddai.get_user_status()
public_url = getattr(request.app.state, "public_url", "")
qr_section = ""
ip_section = ""
if public_url:
parsed = urlparse(public_url)
host = parsed.hostname
label = "Server Address"
color = "#fff"
if host:
if host.startswith("100."):
label = "Tailscale IP"
color = "#ff79c6" # Magenta
elif host.startswith("192.168.") or host.startswith("10.") or host.startswith("172."):
label = "LAN IP"
color = "#50fa7b" # Green
elif "ngrok" in public_url:
label = "Public Tunnel"
color = "#8be9fd" # Cyan
ip_section = f"""
<div style="margin: 20px 0; text-align: center;">
<p style="margin: 0; font-size: 0.8em; color: #888; text-transform: uppercase; letter-spacing: 1px;">{label}</p>
<h2 style="margin: 5px 0; font-size: 1.8em; color: {color}; font-family: monospace; background: rgba(0,0,0,0.2); padding: 10px; border-radius: 8px; border: 1px solid rgba(255,255,255,0.1);">{host}</h2>
</div>
"""
qr_section = f"""
<div style="margin-top: 20px; text-align: center; background: rgba(255,255,255,0.05); padding: 15px; border-radius: 10px;">
<p style="margin: 0 0 10px 0; font-size: 0.9em; color: #aaa;">Scan to Connect</p>
<img src="/api/utils/qrcode?url={public_url}" style="width: 150px; height: 150px; border-radius: 8px; display: block; margin: 0 auto;">
</div>
"""
# System Stats
mem_usage = "N/A"
if psutil:
process = psutil.Process(os.getpid())
mem_usage = f"{process.memory_info().rss / 1024 / 1024:.0f} MB"
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM sessions")
total_sessions = cursor.fetchone()[0]
conn.close()
return f"""
<html>
<head>
<title>BuddAI API (Dev Mode)</title>
<link rel="icon" href="/favicon.ico">
<style>
body {{
background: linear-gradient(135deg, #111 0%, #1a1a1a 100%);
color: #e0e0e0;
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
height: 100vh;
margin: 0;
}}
.dashboard {{
display: flex;
gap: 15px;
margin: 20px 0;
width: 100%;
justify-content: center;
}}
.stat-card {{
background: rgba(255, 255, 255, 0.05);
padding: 15px;
border-radius: 10px;
min-width: 80px;
border: 1px solid rgba(255, 255, 255, 0.1);
}}
.stat-value {{
display: block;
font-size: 1.2em;
font-weight: bold;
color: #fff;
}}
.stat-label {{
font-size: 0.8em;
color: #888;
}}
.container {{
text-align: center;
background: rgba(255, 255, 255, 0.03);
padding: 40px;
border-radius: 16px;
box-shadow: 0 4px 30px rgba(0, 0, 0, 0.3);
backdrop-filter: blur(5px);
border: 1px solid rgba(255, 255, 255, 0.05);
max-width: 400px;
width: 90%;
}}
img {{
width: 120px;
margin-bottom: 1.5rem;
filter: drop-shadow(0 0 15px rgba(255, 152, 0, 0.3));
animation: float 6s ease-in-out infinite;
}}
h1 {{ margin: 0 0 10px 0; font-weight: 600; letter-spacing: 0.5px; color: #fff; }}
p {{ margin: 10px 0; color: #888; font-size: 0.95em; }}
strong {{ color: #ddd; }}
.links {{ margin-top: 30px; display: flex; gap: 15px; justify-content: center; }}
a {{
text-decoration: none;
color: #fff;
background: #0e639c;
padding: 10px 20px;
border-radius: 6px;
transition: all 0.2s;
font-weight: 600;
font-size: 0.9em;
}}
a:hover {{ background: #1177bb; transform: translateY(-2px); }}
a.secondary {{ background: transparent; border: 1px solid #444; color: #ccc; }}
a.secondary:hover {{ background: #333; border-color: #666; color: #fff; }}
@keyframes float {{
0% {{ transform: translateY(0px); }}
50% {{ transform: translateY(-10px); }}
100% {{ transform: translateY(0px); }}
}}
</style>
</head>
<body>
<div class="container">
<img src="/favicon.ico" alt="BuddAI">
<h1>BuddAI API</h1>
<p>Status: <span style="color: #4caf50; font-weight: bold;"> Online</span></p>
<p>Context: <strong>{status}</strong></p>
<div class="dashboard">
<div class="stat-card">
<span class="stat-value">{mem_usage}</span>
<span class="stat-label">Memory</span>
</div>
<div class="stat-card">
<span class="stat-value">{total_sessions}</span>
<span class="stat-label">Sessions</span>
</div>
<div class="stat-card">
<span class="stat-value">{len(buddai_manager.instances)}</span>
<span class="stat-label">Active Users</span>
</div>
</div>
<div class="links">
<a href="/web">Launch Web UI</a>
<a href="/docs" class="secondary">API Docs</a>
</div>
{ip_section}
{qr_section}
</div>
</body>
</html>
"""
@app.get("/favicon.ico", include_in_schema=False)
async def favicon():
return FileResponse(Path(__file__).parent / "icons" / "icon.png")
@app.get("/favicon-16x16.png", include_in_schema=False)
async def favicon_16():
return FileResponse(Path(__file__).parent / "icons" / "favicon-16x16.png")
@app.get("/favicon-32x32.png", include_in_schema=False)
async def favicon_32():
return FileResponse(Path(__file__).parent / "icons" / "favicon-32x32.png")
@app.get("/favicon-192x192.png", include_in_schema=False)
async def favicon_192():
return FileResponse(Path(__file__).parent / "icons" / "favicon-192x192.png")
def validate_upload(file: UploadFile) -> bool:
# Check size
file.file.seek(0, 2)
size = file.file.tell()
file.file.seek(0)
if size > MAX_FILE_SIZE:
raise ValueError(f"File too large (Limit: {MAX_FILE_SIZE//1024//1024}MB)")
# Magic number check for ZIPs
if file.filename.lower().endswith('.zip'):
header = file.file.read(4)
file.file.seek(0)
if header != b'PK\x03\x04':
raise ValueError("Invalid ZIP file header")
if file.content_type not in ALLOWED_TYPES:
# Fallback: check extension if content_type is generic
ext = Path(file.filename).suffix.lower()
if ext not in ['.zip', '.py', '.ino', '.cpp', '.h', '.js', '.jsx', '.html', '.css']:
raise ValueError("Invalid file type")
# Scan for malicious content
return True
def sanitize_filename(filename: str) -> str:
clean = re.sub(r'[^a-zA-Z0-9_.-]', '_', filename)
return clean if clean else "upload.bin"
def safe_extract_zip(zip_path: Path, extract_path: Path):
"""Extract zip file with Zip Slip protection"""
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
for member in zip_ref.infolist():
target_path = extract_path / member.filename
# Resolve paths to ensure they stay within extract_path
if not str(target_path.resolve()).startswith(str(extract_path.resolve())):
raise ValueError(f"Malicious zip member: {member.filename}")
zip_ref.extractall(extract_path)
@app.post("/api/chat")
async def chat_endpoint(request: ChatRequest, user_id: str = Header("default")):
server_buddai = buddai_manager.get_instance(user_id)
response = server_buddai.chat(request.message, force_model=request.model, forge_mode=request.forge_mode)
return {"response": response, "message_id": server_buddai.last_generated_id}
@app.websocket("/api/ws/chat")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
user_message = data.get("message")
user_id = data.get("user_id", "default")
model = data.get("model")
forge_mode = data.get("forge_mode", "2")
server_buddai = buddai_manager.get_instance(user_id)
for chunk in server_buddai.chat_stream(user_message, model, forge_mode):
await websocket.send_json({"type": "token", "content": chunk})
await websocket.send_json({"type": "end", "message_id": server_buddai.last_generated_id})
except WebSocketDisconnect:
pass
@app.post("/api/feedback")
async def feedback_endpoint(req: FeedbackRequest, user_id: str = Header("default")):
server_buddai = buddai_manager.get_instance(user_id)
new_response = server_buddai.record_feedback(req.message_id, req.positive, req.comment)
if new_response:
return {"status": "regenerated", "response": new_response, "message_id": server_buddai.last_generated_id}
return {"status": "success"}
@app.post("/api/system/reset-gpu")
async def reset_gpu_endpoint(user_id: str = Header("default")):
server_buddai = buddai_manager.get_instance(user_id)
result = server_buddai.reset_gpu()
return {"message": result}
@app.get("/api/system/metrics")
async def metrics_endpoint(user_id: str = Header("default")):
server_buddai = buddai_manager.get_instance(user_id)
return server_buddai.metrics.calculate_accuracy()
@app.get("/api/system/status")
async def system_status_endpoint():
mem_percent = 0
cpu_percent = 0
if psutil:
mem = psutil.virtual_memory()
mem_percent = mem.percent
cpu_percent = psutil.cpu_percent(interval=None)
return {"memory": mem_percent, "cpu": cpu_percent}
@app.get("/api/system/backup")
async def backup_endpoint(user_id: str = Header("default")):
server_buddai = buddai_manager.get_instance(user_id)
success, path_or_err = server_buddai.create_backup()
if success:
return FileResponse(
path=path_or_err,
filename=Path(path_or_err).name,
media_type='application/x-sqlite3'
)
else:
return JSONResponse(status_code=500, content={"message": f"Backup failed: {path_or_err}"})
@app.get("/api/utils/qrcode")
async def qrcode_endpoint(url: str):
if not qrcode:
return JSONResponse(status_code=501, content={"message": "qrcode module missing"})
try:
img = qrcode.make(url)
buf = io.BytesIO()
img.save(buf, format="PNG")
buf.seek(0)
return Response(content=buf.getvalue(), media_type="image/png")
except Exception as e:
return JSONResponse(status_code=500, content={"message": f"QR Error: {str(e)}. Ensure 'pillow' is installed."})
@app.get("/api/history")
async def history_endpoint(user_id: str = Header("default")):
server_buddai = buddai_manager.get_instance(user_id)
return {"history": server_buddai.context_messages}
@app.get("/api/sessions")
async def sessions_endpoint(user_id: str = Header("default")):
server_buddai = buddai_manager.get_instance(user_id)
return {"sessions": server_buddai.get_sessions()}
@app.post("/api/session/load")
async def load_session_endpoint(req: SessionLoadRequest, user_id: str = Header("default")):
server_buddai = buddai_manager.get_instance(user_id)
history = server_buddai.load_session(req.session_id)
return {"history": history, "session_id": req.session_id}
@app.post("/api/session/rename")
async def rename_session_endpoint(req: SessionRenameRequest, user_id: str = Header("default")):
server_buddai = buddai_manager.get_instance(user_id)
server_buddai.rename_session(req.session_id, req.title)
return {"status": "success"}
@app.post("/api/session/delete")
async def delete_session_endpoint(req: SessionDeleteRequest, user_id: str = Header("default")):
server_buddai = buddai_manager.get_instance(user_id)
server_buddai.delete_session(req.session_id)
return {"status": "success"}
@app.get("/api/session/{session_id}/export/json")
async def export_json_endpoint(session_id: str, user_id: str = Header("default")):
server_buddai = buddai_manager.get_instance(user_id)
data = server_buddai.get_session_export_data(session_id)
return JSONResponse(
content=data,
headers={"Content-Disposition": f"attachment; filename=session_{session_id}.json"}
)
@app.post("/api/session/import")
async def import_session_endpoint(file: UploadFile = File(...), user_id: str = Header("default")):
if not file.filename.lower().endswith('.json'):
return JSONResponse(status_code=400, content={"message": "Invalid file type. Must be JSON."})
content = await file.read()
try:
data = json.loads(content)
except json.JSONDecodeError:
return JSONResponse(status_code=400, content={"message": "Invalid JSON content."})
server_buddai = buddai_manager.get_instance(user_id)
try:
new_session_id = server_buddai.import_session_from_json(data)
return {"status": "success", "session_id": new_session_id, "message": f"Session imported as {new_session_id}"}
except ValueError as e:
return JSONResponse(status_code=400, content={"message": str(e)})
except Exception as e:
return JSONResponse(status_code=500, content={"message": f"Server error: {str(e)}"})
@app.post("/api/session/clear")
async def clear_session_endpoint(user_id: str = Header("default")):
server_buddai = buddai_manager.get_instance(user_id)
server_buddai.clear_current_session()
return {"status": "success"}
@app.post("/api/session/new")
async def new_session_endpoint(user_id: str = Header("default")):
server_buddai = buddai_manager.get_instance(user_id)
new_id = server_buddai.start_new_session()
return {"session_id": new_id}
@app.post("/api/upload")
async def upload_repo(file: UploadFile = File(...), user_id: str = Header("default")):
server_buddai = buddai_manager.get_instance(user_id)
try:
validate_upload(file)
uploads_dir = DATA_DIR / "uploads"
uploads_dir.mkdir(exist_ok=True)
# Enforce MAX_UPLOAD_FILES (Hardening)
existing_items = sorted(uploads_dir.iterdir(), key=lambda p: p.stat().st_mtime)
while len(existing_items) >= MAX_UPLOAD_FILES:
oldest = existing_items.pop(0)
if oldest.is_dir():
shutil.rmtree(oldest)
else:
oldest.unlink()
safe_name = sanitize_filename(file.filename)
file_location = uploads_dir / safe_name
with open(file_location, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
if safe_name.lower().endswith(".zip"):
extract_path = uploads_dir / file_location.stem
extract_path.mkdir(exist_ok=True)
safe_extract_zip(file_location, extract_path)
server_buddai.index_local_repositories(extract_path)
file_location.unlink() # Cleanup zip
return {"message": f"✅ Successfully indexed {safe_name}"}
else:
# Support single code files by moving them to a folder and indexing
if file_location.suffix.lower() in ['.py', '.ino', '.cpp', '.h', '.js', '.jsx', '.html', '.css']:
target_dir = uploads_dir / file_location.stem
target_dir.mkdir(exist_ok=True)
final_path = target_dir / safe_name
shutil.move(str(file_location), str(final_path))
server_buddai.index_local_repositories(target_dir)
return {"message": f"✅ Successfully indexed {safe_name}"}
return {"message": f"✅ Successfully uploaded {safe_name}"}
except Exception as e:
return {"message": f"❌ Error: {str(e)}"}
def check_ollama() -> bool:
try:
conn = http.client.HTTPConnection(OLLAMA_HOST, OLLAMA_PORT, timeout=5)
conn.request("GET", "/api/tags")
response = conn.getresponse()
if response.status == 200:
data = json.loads(response.read().decode('utf-8'))
conn.close()
installed_models = [m['name'] for m in data.get('models', [])]
missing = [m for m in MODELS.values() if m not in installed_models]
if missing:
print(f"⚠️ WARNING: Missing models in Ollama: {', '.join(missing)}")
print(f" Run in host terminal: ollama pull {' && ollama pull '.join(missing)}")
return True
return False
except Exception:
return False
def is_port_available(port: int, host: str = "0.0.0.0") -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind((host, port))
return True
except socket.error:
return False
def main() -> None:
if not check_ollama():
print(f"❌ Ollama not running at {OLLAMA_HOST}:{OLLAMA_PORT}. Ensure it is running and accessible.")
sys.exit(1)
parser = argparse.ArgumentParser(description="BuddAI Executive")
parser.add_argument("--server", action="store_true", help="Run in server mode")
parser.add_argument("--port", type=int, default=8000, help="Port for server mode")
parser.add_argument("--host", type=str, default="0.0.0.0", help="Host IP address")
parser.add_argument("--public-url", type=str, default="", help="Public URL for QR codes")
args = parser.parse_args()
if args.server:
if SERVER_AVAILABLE:
port = args.port
if not is_port_available(port, args.host):
print(f"⚠️ Port {port} is in use.")
for i in range(1, 11):
if is_port_available(port + i, args.host):
port += i
print(f"🔄 Switching to available port: {port}")
break
else:
print(f"❌ Could not find available port in range {args.port}-{args.port+10}")
sys.exit(1)
# Silence health check logs from frontend polling
class EndpointFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
msg = record.getMessage()
return "/api/system/status" not in msg and '"GET / HTTP/1.1" 200' not in msg
logging.getLogger("uvicorn.access").addFilter(EndpointFilter())
print(f"🚀 Starting BuddAI API Server on port {port}...")
if args.public_url:
print(f"🔗 Public Access: {args.public_url}")
app.state.public_url = args.public_url
uvicorn.run(app, host=args.host, port=port)
else:
print("❌ Server dependencies missing. Install: pip install fastapi uvicorn aiofiles python-multipart")
else:
buddai = BuddAI()
buddai.run()

53
buddai_shared.py Normal file
View file

@ -0,0 +1,53 @@
import os
import sqlite3
from pathlib import Path
import queue
import http.client
# Global Config
DATA_DIR = Path(__file__).parent / "data"
DB_PATH = DATA_DIR / "conversations.db"
OLLAMA_HOST = os.getenv("OLLAMA_HOST", "127.0.0.1")
OLLAMA_PORT = int(os.getenv("OLLAMA_PORT", "11434"))
# Shared Models
MODELS = {
"fast": "qwen2.5-coder:1.5b",
"balanced": "qwen2.5-coder:3b"
}
# Shared Connection Pool logic to avoid "port in use" or "too many connections" errors
class OllamaConnectionPool:
def __init__(self, host, port, max_size=10):
self.host = host
self.port = port
self.pool = queue.Queue(maxsize=max_size)
def get_connection(self):
try: return self.pool.get_nowait()
except: return http.client.HTTPConnection(self.host, self.port, timeout=90)
def return_connection(self, conn):
try: self.pool.put_nowait(conn)
except: conn.close()
OLLAMA_POOL = OllamaConnectionPool(OLLAMA_HOST, OLLAMA_PORT)
# Server Availability Check
try:
import fastapi
import uvicorn
SERVER_AVAILABLE = True
except ImportError:
SERVER_AVAILABLE = False
# Shared Patterns
COMPLEX_TRIGGERS = [
"multiple modules", "integrate", "combine", "modular", "state machine", "safety", "failsafe", "logic", "protocol", "integration"
]
MODULE_PATTERNS = {
"ble": ["ble", "bluetooth", "phone app", "remote"],
"servo": ["servo", "flipper", "arm", "mg996", "sg90"],
"motor": ["motor", "drive", "l298n", "movement", "wheels"],
"safety": ["safety", "timeout", "failsafe", "emergency"],
"battery": ["battery", "voltage", "power"],
"sensor": ["sensor", "distance", "proximity", "ultrasonic", "ir"]
}

43
decouple_buddai.py Normal file
View file

@ -0,0 +1,43 @@
import os
import re
def decouple_exocortex(source_file):
with open(source_file, 'r', encoding='utf-8') as f:
content = f.read()
# Define the file splits based on class/block signatures
splits = {
"buddai_memory.py": ["class ShadowSuggestionEngine", "class AdaptiveLearner", "class SmartLearner"],
"buddai_logic.py": ["class CodeValidator", "class HardwareProfile", "class LearningMetrics"],
"buddai_executive.py": ["class OllamaConnectionPool", "class BuddAI", "class ModelFineTuner"],
"buddai_server.py": ["if SERVER_AVAILABLE:", "app = FastAPI", "class BuddAIManager"]
}
print(f"🚀 Surgical extraction of {source_file} initiated...")
# Extraction logic for classes/blocks
for filename, markers in splits.items():
extracted_sections = []
for marker in markers:
# Simple extraction based on class indentation/block end
pattern = re.compile(rf"{re.escape(marker)}.*?(?=\nclass |\nif __name__ ==|\nif SERVER_AVAILABLE)", re.DOTALL)
match = pattern.search(content)
if match:
extracted_sections.append(match.group(0))
if extracted_sections:
with open(filename, 'w', encoding='utf-8') as f:
f.write("#!/usr/bin/env python3\n")
f.write("import sys, os, json, logging, sqlite3, datetime, pathlib, http.client, re, typing, zipfile, shutil, queue, socket, argparse, io, difflib\n")
f.write("from pathlib import Path\nfrom datetime import datetime, timedelta\nfrom typing import Optional, List, Dict, Tuple, Union, Generator\n\n")
f.write("try:\n from fastapi import FastAPI, File, Header, Response, UploadFile, WebSocketDisconnect, Request, WebSocket\n from fastapi.middleware.cors import CORSMiddleware\n from fastapi.responses import FileResponse, HTMLResponse, JSONResponse\n from fastapi.staticfiles import StaticFiles\n from pydantic import BaseModel\n import uvicorn\nexcept ImportError:\n pass\n\n")
f.write("\n\n".join(extracted_sections))
print(f"✅ Created {filename}")
if __name__ == "__main__":
# Use the script's directory to find main.py reliably
source_path = os.path.join(os.path.dirname(__file__), "main.py")
if os.path.exists(source_path):
decouple_exocortex(source_path)
else:
print(f"❌ Error: Could not find {source_path}")

File diff suppressed because it is too large Load diff

2618
main.py

File diff suppressed because it is too large Load diff