mirror of
https://github.com/JamesTheGiblet/BuddAI.git
synced 2026-01-08 21:58:40 +00:00
1100 lines
No EOL
42 KiB
Python
1100 lines
No EOL
42 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
BuddAI Executive v3.1 - Modular Builder
|
|
Breaks complex tasks into manageable chunks
|
|
|
|
Author: James Gilbert
|
|
License: MIT
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import sqlite3
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
import http.client
|
|
import re # noqa: F401
|
|
from typing import Optional
|
|
import zipfile
|
|
import shutil
|
|
|
|
# Server dependencies
|
|
try:
|
|
from fastapi import FastAPI, UploadFile, File
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.responses import FileResponse, HTMLResponse
|
|
from pydantic import BaseModel
|
|
import uvicorn
|
|
SERVER_AVAILABLE = True
|
|
except ImportError:
|
|
SERVER_AVAILABLE = False
|
|
|
|
# Configuration
|
|
OLLAMA_HOST = "localhost"
|
|
OLLAMA_PORT = 11434
|
|
DATA_DIR = Path(__file__).parent / "data"
|
|
DB_PATH = DATA_DIR / "conversations.db"
|
|
|
|
# Validation Config
|
|
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB
|
|
MAX_UPLOAD_FILES = 10
|
|
ALLOWED_TYPES = [
|
|
"application/zip", "application/x-zip-compressed", "application/octet-stream",
|
|
"text/plain", "text/x-python", "text/javascript", "application/javascript",
|
|
"text/html", "text/css", "text/x-c", "text/x-c++src"
|
|
]
|
|
|
|
# Models
|
|
MODELS = {
|
|
"fast": "qwen2.5-coder:1.5b",
|
|
"balanced": "qwen2.5-coder:3b"
|
|
}
|
|
|
|
# Complexity triggers - if matched, break down the task
|
|
COMPLEX_TRIGGERS = [
|
|
"complete", "entire", "full", "build entire", "build complete",
|
|
"with ble and", "with servo and", "including", "all of"
|
|
]
|
|
|
|
# Module patterns we can detect
|
|
MODULE_PATTERNS = {
|
|
"ble": ["bluetooth", "ble", "wireless"],
|
|
"servo": ["servo", "flipper", "weapon"],
|
|
"motor": ["motor", "drive", "movement", "l298n"],
|
|
"safety": ["safety", "timeout", "failsafe", "emergency"],
|
|
"battery": ["battery", "voltage", "power monitor"],
|
|
"sensor": ["sensor", "distance", "proximity"]
|
|
}
|
|
|
|
|
|
|
|
# --- Shadow Suggestion Engine ---
|
|
class ShadowSuggestionEngine:
|
|
"""Proactively suggests modules/settings based on user/project history."""
|
|
def __init__(self, db_path):
|
|
self.db_path = db_path
|
|
|
|
def lookup_recent_module_usage(self, module, limit=5):
|
|
"""Look up recent usage patterns for a module from repo_index."""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"""
|
|
SELECT file_path, content, last_modified FROM repo_index
|
|
WHERE function_name LIKE ? OR file_path LIKE ?
|
|
ORDER BY last_modified DESC LIMIT ?
|
|
""",
|
|
(f"%{module}%", f"%{module}%", limit)
|
|
)
|
|
results = cursor.fetchall()
|
|
conn.close()
|
|
return results
|
|
|
|
def suggest_for_module(self, module):
|
|
"""Return a proactive suggestion string for a module if pattern detected."""
|
|
history = self.lookup_recent_module_usage(module)
|
|
if not history:
|
|
return None
|
|
# Example: For 'motor', look for L298N and PWM frequency
|
|
l298n_count = 0
|
|
pwm_freqs = []
|
|
for _, content, _ in history:
|
|
if "L298N" in content or "l298n" in content:
|
|
l298n_count += 1
|
|
pwm_matches = re.findall(r'PWM_FREQ\s*=\s*(\d+)', content)
|
|
pwm_freqs.extend([int(f) for f in pwm_matches])
|
|
# Also look for explicit frequency in analogWrite or ledcSetup
|
|
freq_matches = re.findall(r'(?:ledcSetup|analogWrite)\s*\([^,]+,\s*[^,]+,\s*(\d+)\)', content)
|
|
pwm_freqs.extend([int(f) for f in freq_matches if f.isdigit()])
|
|
if l298n_count >= 2:
|
|
freq = max(set(pwm_freqs), key=pwm_freqs.count) if pwm_freqs else 500
|
|
return f"I see you usually use the L298N with a {freq}Hz PWM frequency on the ESP32-C3. Should I prep that module?"
|
|
return None
|
|
|
|
def get_proactive_suggestion(self, user_input):
|
|
"""
|
|
V3.0 Proactive Hook:
|
|
1. Identify "Concept" (e.g., 'flipper')
|
|
2. Query repo_index for James's most frequent companion modules
|
|
3. If 'flipper' often appears with 'safety_timeout', suggest it.
|
|
"""
|
|
# 1. Identify Concepts
|
|
input_lower = user_input.lower()
|
|
detected_modules = []
|
|
for module, keywords in MODULE_PATTERNS.items():
|
|
if any(kw in input_lower for kw in keywords):
|
|
detected_modules.append(module)
|
|
|
|
if not detected_modules:
|
|
return None
|
|
|
|
# 2. Query repo_index for correlations
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
suggestions = []
|
|
for module in detected_modules:
|
|
# Find files containing this module (simple heuristic)
|
|
cursor.execute("SELECT content FROM repo_index WHERE content LIKE ? LIMIT 10", (f"%{module}%",))
|
|
rows = cursor.fetchall()
|
|
if not rows: continue
|
|
|
|
# Check for companion modules
|
|
companions = {}
|
|
for (content,) in rows:
|
|
content_lower = content.lower()
|
|
for other_mod, other_kws in MODULE_PATTERNS.items():
|
|
if other_mod != module and other_mod not in detected_modules:
|
|
if any(kw in content_lower for kw in other_kws):
|
|
companions[other_mod] = companions.get(other_mod, 0) + 1
|
|
|
|
# 3. Suggest if frequent (>50% correlation in sample)
|
|
for other_mod, count in companions.items():
|
|
if count >= len(rows) * 0.5:
|
|
suggestions.append(f"I noticed '{module}' often appears with '{other_mod}' in your repos. Want to include that?")
|
|
|
|
conn.close()
|
|
return " ".join(list(set(suggestions))) if suggestions else None
|
|
|
|
def get_all_suggestions(self, user_input, generated_code):
|
|
"""Aggregate all proactive suggestions into a list."""
|
|
suggestions = []
|
|
|
|
# 1. Companion Modules
|
|
companion = self.get_proactive_suggestion(user_input)
|
|
if companion:
|
|
suggestions.append(companion)
|
|
|
|
# 2. Module Settings
|
|
input_lower = user_input.lower()
|
|
for module, keywords in MODULE_PATTERNS.items():
|
|
if any(kw in input_lower for kw in keywords):
|
|
s = self.suggest_for_module(module)
|
|
if s:
|
|
suggestions.append(s)
|
|
|
|
# 3. Forge Theory Check
|
|
if ("motor" in input_lower or "servo" in input_lower) and "applyForge" not in generated_code:
|
|
suggestions.append("Apply Forge Theory smoothing to movement?")
|
|
|
|
# 4. Safety Check (L298N)
|
|
if "L298N" in generated_code and "safety" not in generated_code.lower():
|
|
suggestions.append("Drive system lacks safety timeout (GilBot_V2 uses 5s failsafe). Add that?")
|
|
|
|
return suggestions
|
|
|
|
|
|
class BuddAI:
|
|
"""Executive with task breakdown"""
|
|
|
|
def is_search_query(self, message):
|
|
"""Check if this is a search query that should query repo_index"""
|
|
message_lower = message.lower()
|
|
search_triggers = [
|
|
"show me", "find", "search for", "list all",
|
|
"what functions", "which repos", "do i have",
|
|
"where did i", "have i used", "examples of",
|
|
"show all", "display"
|
|
]
|
|
return any(trigger in message_lower for trigger in search_triggers)
|
|
|
|
def search_repositories(self, query):
|
|
"""Search repo_index for relevant functions and code"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT COUNT(*) FROM repo_index")
|
|
count = cursor.fetchone()[0]
|
|
print(f"\n🔍 Searching {count} indexed functions...\n")
|
|
|
|
# Extract keywords from query
|
|
keywords = re.findall(r'\b\w{4,}\b', query.lower())
|
|
# Add specific search terms
|
|
specific_terms = []
|
|
if "exponential" in query.lower() or "decay" in query.lower():
|
|
specific_terms.append("applyForge")
|
|
specific_terms.append("exp(")
|
|
if "forge" in query.lower():
|
|
specific_terms.append("Forge")
|
|
keywords.extend(specific_terms)
|
|
|
|
if not keywords:
|
|
print("❌ No search terms found")
|
|
conn.close()
|
|
return "No search terms provided."
|
|
|
|
# Build parameterized query
|
|
conditions = []
|
|
params = []
|
|
for keyword in keywords:
|
|
conditions.append("(function_name LIKE ? OR content LIKE ? OR repo_name LIKE ?)")
|
|
params.extend([f"%{keyword}%", f"%{keyword}%", f"%{keyword}%"])
|
|
|
|
sql = f"SELECT repo_name, file_path, function_name, content FROM repo_index WHERE {' OR '.join(conditions)} ORDER BY last_modified DESC LIMIT 10"
|
|
|
|
cursor.execute(sql, params)
|
|
results = cursor.fetchall()
|
|
conn.close()
|
|
if not results:
|
|
return f"❌ No functions found matching: {', '.join(keywords)}\n\nTry: /index <path> to index more repositories"
|
|
# Format results
|
|
output = f"✅ Found {len(results)} matches for: {', '.join(set(keywords))}\n\n"
|
|
for i, (repo, file_path, func, content) in enumerate(results, 1):
|
|
# Extract relevant snippet
|
|
lines = content.split('\n')
|
|
snippet_lines = []
|
|
for line in lines[:30]: # First 30 lines
|
|
if any(kw in line.lower() for kw in keywords):
|
|
snippet_lines.append(line)
|
|
if len(snippet_lines) >= 10:
|
|
break
|
|
if not snippet_lines:
|
|
snippet_lines = lines[:10]
|
|
snippet = '\n'.join(snippet_lines)
|
|
output += f"**{i}. {func}()** in {repo}\n"
|
|
output += f" 📁 {Path(file_path).name}\n"
|
|
output += f"\n```cpp\n{snippet}\n```\n"
|
|
output += f" ---\n\n"
|
|
return output
|
|
|
|
def __init__(self, server_mode=False):
|
|
self.ensure_data_dir()
|
|
self.init_database()
|
|
self.session_id = self.create_session()
|
|
self.server_mode = server_mode
|
|
self.context_messages = []
|
|
self.shadow_engine = ShadowSuggestionEngine(DB_PATH)
|
|
|
|
print("BuddAI Executive v3.1 - Modular Builder")
|
|
print("=" * 50)
|
|
print(f"Session: {self.session_id}")
|
|
print(f"FAST (5-10s) | BALANCED (15-30s)")
|
|
print(f"Smart task breakdown for complex requests")
|
|
print("=" * 50)
|
|
print("\nCommands: /fast, /balanced, /help, exit\n")
|
|
|
|
def ensure_data_dir(self):
|
|
DATA_DIR.mkdir(exist_ok=True)
|
|
|
|
def init_database(self):
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS sessions (
|
|
session_id TEXT PRIMARY KEY,
|
|
started_at TIMESTAMP,
|
|
ended_at TIMESTAMP,
|
|
title TEXT
|
|
)
|
|
""")
|
|
|
|
try:
|
|
cursor.execute("ALTER TABLE sessions ADD COLUMN title TEXT")
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS messages (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
session_id TEXT,
|
|
role TEXT,
|
|
content TEXT,
|
|
timestamp TIMESTAMP
|
|
)
|
|
""")
|
|
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS repo_index (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
file_path TEXT,
|
|
repo_name TEXT,
|
|
function_name TEXT,
|
|
content TEXT,
|
|
last_modified TIMESTAMP
|
|
)
|
|
""")
|
|
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS style_preferences (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
category TEXT,
|
|
preference TEXT,
|
|
confidence FLOAT,
|
|
extracted_at TIMESTAMP
|
|
)
|
|
""")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def create_session(self):
|
|
session_id = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"INSERT INTO sessions (session_id, started_at) VALUES (?, ?)",
|
|
(session_id, datetime.now().isoformat())
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
return session_id
|
|
|
|
def end_session(self):
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"UPDATE sessions SET ended_at = ? WHERE session_id = ?",
|
|
(datetime.now().isoformat(), self.session_id)
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def save_message(self, role, content):
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"INSERT INTO messages (session_id, role, content, timestamp) VALUES (?, ?, ?, ?)",
|
|
(self.session_id, role, content, datetime.now().isoformat())
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def index_local_repositories(self, root_path):
|
|
"""Crawl directories and index .py, .ino, and .cpp files"""
|
|
import ast
|
|
|
|
print(f"\n🔍 Indexing repositories in: {root_path}")
|
|
path = Path(root_path)
|
|
|
|
if not path.exists():
|
|
print(f"❌ Path not found: {root_path}")
|
|
return
|
|
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
count = 0
|
|
|
|
for file_path in path.rglob('*'):
|
|
if file_path.is_file() and file_path.suffix in ['.py', '.ino', '.cpp', '.h', '.js', '.jsx', '.html', '.css']:
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
content = f.read()
|
|
|
|
functions = []
|
|
|
|
# Python parsing
|
|
if file_path.suffix == '.py':
|
|
try:
|
|
tree = ast.parse(content)
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.FunctionDef):
|
|
functions.append(node.name)
|
|
except:
|
|
pass
|
|
|
|
# C++/Arduino parsing
|
|
elif file_path.suffix in ['.ino', '.cpp', '.h']:
|
|
matches = re.findall(r'\b(?:void|int|bool|float|double|String|char)\s+(\w+)\s*\(', content)
|
|
functions.extend(matches)
|
|
|
|
# JS/Web parsing
|
|
elif file_path.suffix in ['.js', '.jsx']:
|
|
matches = re.findall(r'(?:function\s+(\w+)|const\s+(\w+)\s*=\s*(?:async\s*)?\(?.*?\)?\s*=>)', content)
|
|
functions.extend([m[0] or m[1] for m in matches if m[0] or m[1]])
|
|
|
|
# HTML/CSS - Index as whole file
|
|
elif file_path.suffix in ['.html', '.css']:
|
|
functions.append("file_content")
|
|
|
|
# Determine repo name
|
|
try:
|
|
repo_name = file_path.relative_to(path).parts[0]
|
|
except:
|
|
repo_name = "unknown"
|
|
|
|
timestamp = datetime.fromtimestamp(file_path.stat().st_mtime)
|
|
|
|
for func in functions:
|
|
cursor.execute("""
|
|
INSERT INTO repo_index (file_path, repo_name, function_name, content, last_modified)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""", (str(file_path), repo_name, func, content, timestamp.isoformat()))
|
|
count += 1
|
|
|
|
except Exception:
|
|
pass
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
print(f"✅ Indexed {count} functions across repositories")
|
|
|
|
def retrieve_style_context(self, message):
|
|
"""Search repo_index for code snippets matching the request"""
|
|
# Extract potential keywords (nouns/modules)
|
|
keywords = re.findall(r'\b\w{4,}\b', message.lower())
|
|
if not keywords:
|
|
return ""
|
|
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
# Build a search query for function names or repo names
|
|
search_terms = " OR ".join([f"function_name LIKE '%{k}%'" for k in keywords])
|
|
search_terms += " OR " + " OR ".join([f"repo_name LIKE '%{k}%'" for k in keywords])
|
|
|
|
query = f"SELECT repo_name, function_name, content FROM repo_index WHERE {search_terms} LIMIT 2"
|
|
|
|
cursor.execute(query)
|
|
results = cursor.fetchall()
|
|
conn.close()
|
|
|
|
if not results:
|
|
return ""
|
|
|
|
context_block = "\n[REFERENCE STYLE FROM JAMES'S PAST PROJECTS]\n"
|
|
for repo, func, content in results:
|
|
# Just grab the first 500 chars of the file to save context window
|
|
snippet = content[:500] + "..."
|
|
context_block += f"Repo: {repo} | Function: {func}\nCode:\n{snippet}\n---\n"
|
|
|
|
return context_block
|
|
|
|
def scan_style_signature(self):
|
|
"""V3.0: Analyze repo_index to extract style preferences."""
|
|
print("\n🕵️ Scanning repositories for style signature...")
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
# Get a sample of code
|
|
cursor.execute("SELECT content FROM repo_index ORDER BY RANDOM() LIMIT 5")
|
|
rows = cursor.fetchall()
|
|
|
|
if not rows:
|
|
print("❌ No code indexed. Run /index first.")
|
|
conn.close()
|
|
return
|
|
|
|
code_sample = "\n---\n".join([r[0][:1000] for r in rows])
|
|
|
|
prompt = f"""Analyze this code sample from James's repositories.
|
|
Extract 3 distinct coding preferences or patterns.
|
|
Format: Category: Preference
|
|
|
|
Examples:
|
|
- Serial: Uses 115200 baud
|
|
- Safety: Uses non-blocking millis()
|
|
- Pins: Prefers #define over const int
|
|
|
|
Code Sample:
|
|
{code_sample}
|
|
"""
|
|
|
|
print("⚡ Analyzing with BALANCED model...")
|
|
summary = self.call_model("balanced", prompt)
|
|
|
|
# Store in DB
|
|
timestamp = datetime.now().isoformat()
|
|
lines = summary.split('\n')
|
|
for line in lines:
|
|
if ':' in line:
|
|
parts = line.split(':', 1)
|
|
category = parts[0].strip('- *')
|
|
pref = parts[1].strip()
|
|
cursor.execute(
|
|
"INSERT INTO style_preferences (category, preference, confidence, extracted_at) VALUES (?, ?, ?, ?)",
|
|
(category, pref, 0.8, timestamp)
|
|
)
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
print(f"\n✅ Style Signature Updated:\n{summary}\n")
|
|
|
|
def is_simple_question(self, message):
|
|
"""Check if this is a simple question that should use FAST model"""
|
|
message_lower = message.lower()
|
|
|
|
simple_triggers = [
|
|
"what is", "what's", "who is", "who's", "when is",
|
|
"how do i", "can you explain", "tell me about",
|
|
"what are", "where is"
|
|
]
|
|
|
|
# Also check if it's just a question without code keywords
|
|
code_keywords = ["generate", "create", "write", "build", "code", "function"]
|
|
|
|
has_simple_trigger = any(trigger in message_lower for trigger in simple_triggers)
|
|
has_code_keyword = any(keyword in message_lower for keyword in code_keywords)
|
|
|
|
# Simple if: has simple trigger AND no code keywords
|
|
return has_simple_trigger and not has_code_keyword
|
|
|
|
def is_complex(self, message):
|
|
"""Check if request is too complex and should be broken down"""
|
|
message_lower = message.lower()
|
|
|
|
# Count complexity triggers
|
|
trigger_count = sum(1 for trigger in COMPLEX_TRIGGERS if trigger in message_lower)
|
|
|
|
# Count how many modules mentioned
|
|
module_count = 0
|
|
for module, keywords in MODULE_PATTERNS.items():
|
|
if any(kw in message_lower for kw in keywords):
|
|
module_count += 1
|
|
|
|
# Complex if: multiple triggers OR 3+ modules mentioned
|
|
return trigger_count >= 2 or module_count >= 3
|
|
|
|
def extract_modules(self, message):
|
|
"""Extract which modules are needed"""
|
|
message_lower = message.lower()
|
|
needed_modules = []
|
|
|
|
for module, keywords in MODULE_PATTERNS.items():
|
|
if any(kw in message_lower for kw in keywords):
|
|
needed_modules.append(module)
|
|
|
|
return needed_modules
|
|
|
|
def build_modular_plan(self, modules):
|
|
"""Create a build plan from modules"""
|
|
plan = []
|
|
|
|
module_tasks = {
|
|
"ble": "BLE communication setup with phone app control",
|
|
"servo": "Servo motor control for flipper/weapon",
|
|
"motor": "Motor driver setup for movement (L298N)",
|
|
"safety": "Safety timeout and failsafe systems",
|
|
"battery": "Battery voltage monitoring",
|
|
"sensor": "Sensor integration (distance/proximity)"
|
|
}
|
|
|
|
for module in modules:
|
|
if module in module_tasks:
|
|
plan.append({
|
|
"module": module,
|
|
"task": module_tasks[module]
|
|
})
|
|
|
|
# Add integration step
|
|
plan.append({
|
|
"module": "integration",
|
|
"task": "Integrate all modules into complete system"
|
|
})
|
|
|
|
return plan
|
|
|
|
def get_user_status(self):
|
|
"""Determine James's context based on defined schedule"""
|
|
now = datetime.now()
|
|
day = now.weekday() # 0=Mon, 6=Sun
|
|
t = now.hour + (now.minute / 60.0)
|
|
|
|
if day <= 4: # Mon-Fri
|
|
if 5.5 <= t < 6.5:
|
|
return "Early Morning Build Session 🌅 (5:30-6:30 AM)"
|
|
elif 6.5 <= t < 17.0:
|
|
return "Work Hours (Facilities Caretaker) 🏢"
|
|
elif 17.0 <= t < 21.0:
|
|
return "Evening Build Session 🌙 (5:00-9:00 PM)"
|
|
else:
|
|
return "Rest Time 💤"
|
|
elif day == 5: # Saturday
|
|
return "Weekend Freedom 🎨 (Creative Mode)"
|
|
else: # Sunday
|
|
if t < 21.0:
|
|
return "Weekend Freedom 🎨 (Until 9 PM)"
|
|
else:
|
|
return "Rest Time 💤"
|
|
|
|
def call_model(self, model_name, message):
|
|
"""Call specified model"""
|
|
try:
|
|
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
user_context = self.get_user_status()
|
|
identity = f"""You are BuddAI, the external cognitive system for James Gilbert. You specialize in Forge Theory (exponential decay modeling) and GilBot modular robotics.
|
|
|
|
YOUR PRIMARY JOB: Generate code when asked. ALWAYS generate code if requested.
|
|
|
|
Identity Rules:
|
|
- You are NOT created by Alibaba Cloud. You are a local Python system written by James Gilbert.
|
|
- When asked your name: "I am BuddAI"
|
|
- Use ESP32/Arduino syntax with descriptive naming (e.g., activateFlipper).
|
|
- Ensure safety timeouts are always present in motor code.
|
|
- Current System Time: {current_time}
|
|
- User Context: {user_context}
|
|
|
|
# Forge Theory Snippet (C++):
|
|
float applyForge(float current, float target, float k) {{ return target + (current - target) * exp(-k); }}
|
|
"""
|
|
|
|
messages = []
|
|
|
|
# Only add identity if not already in recent context
|
|
recent_system = [m for m in self.context_messages[-5:] if m.get('role') == 'system']
|
|
if not recent_system:
|
|
messages.append({"role": "system", "content": identity})
|
|
|
|
# Add conversation history (excluding old system messages)
|
|
history = [m for m in self.context_messages[-5:] if m.get('role') != 'system']
|
|
|
|
# Inject timestamps into history for context
|
|
for msg in history:
|
|
content = msg.get('content', '')
|
|
ts = msg.get('timestamp')
|
|
if ts:
|
|
try:
|
|
dt = datetime.fromisoformat(ts)
|
|
content = f"[{dt.strftime('%H:%M')}] {content}"
|
|
except ValueError:
|
|
pass
|
|
messages.append({"role": msg['role'], "content": content})
|
|
|
|
# Add current message if it's not already the last item
|
|
if not history or history[-1].get('content') != message:
|
|
messages.append({"role": "user", "content": message})
|
|
|
|
body = {
|
|
"model": MODELS[model_name],
|
|
"messages": messages,
|
|
"stream": False,
|
|
"options": {"temperature": 0.7, "num_ctx": 4096}
|
|
}
|
|
|
|
conn = http.client.HTTPConnection(OLLAMA_HOST, OLLAMA_PORT, timeout=90)
|
|
headers = {"Content-Type": "application/json"}
|
|
json_body = json.dumps(body)
|
|
|
|
conn.request("POST", "/api/chat", json_body, headers)
|
|
response = conn.getresponse()
|
|
|
|
if response.status == 200:
|
|
data = json.loads(response.read().decode('utf-8'))
|
|
return data.get("message", {}).get("content", "No response")
|
|
else:
|
|
return f"Error: {response.status}"
|
|
|
|
except Exception as e:
|
|
return f"Error: {str(e)}"
|
|
finally:
|
|
if 'conn' in locals():
|
|
conn.close()
|
|
|
|
def execute_modular_build(self, _, modules, plan, forge_mode="2"):
|
|
"""Execute build plan step by step"""
|
|
print(f"\n🔨 MODULAR BUILD MODE")
|
|
print(f"Detected {len(modules)} modules: {', '.join(modules)}")
|
|
print(f"Breaking into {len(plan)} steps...\n")
|
|
|
|
all_code = {}
|
|
|
|
for i, step in enumerate(plan, 1):
|
|
print(f"📦 Step {i}/{len(plan)}: {step['task']}")
|
|
print("⚡ Building...\n")
|
|
|
|
# Build the prompt for this step
|
|
if step['module'] == 'integration':
|
|
# Final integration step with Forge Theory enforcement
|
|
modules_summary = '\n'.join([f"- {m}: {all_code[m][:150]}..." for m in modules if m in all_code])
|
|
|
|
# Ask James for the 'vibe' of the robot
|
|
print("\n⚡ FORGE THEORY TUNING:")
|
|
print("1. Aggressive (k=0.3) - High snap, combat ready")
|
|
print("2. Balanced (k=0.1) - Standard movement")
|
|
print("3. Graceful (k=0.03) - Roasting / Smooth curves")
|
|
|
|
if self.server_mode:
|
|
choice = forge_mode
|
|
else:
|
|
choice = input("Select Forge Constant [1-3, default 2]: ")
|
|
|
|
k_val = "0.1"
|
|
if choice == "1": k_val = "0.3"
|
|
elif choice == "3": k_val = "0.03"
|
|
|
|
prompt = f"""INTEGRATION TASK: Combine modules into a cohesive GilBot system.
|
|
|
|
[MODULES]
|
|
{modules_summary}
|
|
|
|
[FORGE PARAMETERS]
|
|
Set k = {k_val} for all applyForge() calls.
|
|
|
|
[REQUIREMENTS]
|
|
1. Implement applyForge() math helper.
|
|
2. Use k={k_val} to smooth motor and servo transitions.
|
|
3. Ensure naming matches James's style: activateFlipper(), setMotors().
|
|
"""
|
|
else:
|
|
# Individual module
|
|
prompt = f"Generate ESP32-C3 code for: {step['task']}. Keep it modular with clear comments."
|
|
|
|
# Call balanced model for each module
|
|
response = self.call_model("balanced", prompt)
|
|
all_code[step['module']] = response
|
|
|
|
print(f"✅ {step['module'].upper()} module complete\n")
|
|
print("-" * 50 + "\n")
|
|
|
|
# Compile final response
|
|
final = "# COMPLETE GILBOT CONTROLLER - MODULAR BUILD\n\n"
|
|
for module, code in all_code.items():
|
|
final += f"## {module.upper()} MODULE\n{code}\n\n"
|
|
|
|
return final
|
|
|
|
def apply_style_signature(self, generated_code):
|
|
"""Refine generated code to match James's specific naming and safety patterns"""
|
|
# 1. Check for James's common function names (e.g., setupMotors vs init_motors)
|
|
# 2. Ensure Forge Theory helpers are present if motion is detected
|
|
# 3. Append a 'Proactive Note' if a common companion module is missing
|
|
|
|
return generated_code
|
|
|
|
# --- Main Chat Method ---
|
|
def chat(self, user_message, force_model=None, forge_mode="2"):
|
|
"""Main chat with smart routing and shadow suggestions"""
|
|
style_context = self.retrieve_style_context(user_message)
|
|
if style_context:
|
|
self.context_messages.append({"role": "system", "content": style_context})
|
|
|
|
|
|
self.save_message("user", user_message)
|
|
self.context_messages.append({"role": "user", "content": user_message, "timestamp": datetime.now().isoformat()})
|
|
|
|
# Direct Schedule Check
|
|
if "what should i be doing" in user_message.lower() or "my schedule" in user_message.lower() or "schedule check" in user_message.lower():
|
|
status = self.get_user_status()
|
|
response = f"📅 **Schedule Check**\nAccording to your protocol, you should be: **{status}**"
|
|
print(f"⏰ Schedule check triggered: {status}")
|
|
self.save_message("assistant", response)
|
|
self.context_messages.append({"role": "assistant", "content": response, "timestamp": datetime.now().isoformat()})
|
|
return response
|
|
|
|
# Determine model based on complexity
|
|
if force_model:
|
|
model = force_model
|
|
print(f"\n⚡ Using {model.upper()} model (forced)...")
|
|
response = self.call_model(model, user_message)
|
|
elif self.is_complex(user_message):
|
|
modules = self.extract_modules(user_message)
|
|
plan = self.build_modular_plan(modules)
|
|
print("\n" + "=" * 50)
|
|
print("🎯 COMPLEX REQUEST DETECTED!")
|
|
print(f"Modules needed: {', '.join(modules)}")
|
|
print(f"Breaking into {len(plan)} manageable steps")
|
|
print("=" * 50)
|
|
response = self.execute_modular_build(user_message, modules, plan, forge_mode)
|
|
elif self.is_search_query(user_message):
|
|
# This is a search query - query the database
|
|
response = self.search_repositories(user_message)
|
|
elif self.is_simple_question(user_message):
|
|
print("\n⚡ Using FAST model (simple question)...")
|
|
response = self.call_model("fast", user_message)
|
|
else:
|
|
print("\n⚖️ Using BALANCED model...")
|
|
response = self.call_model("balanced", user_message)
|
|
|
|
# Apply Style Guard
|
|
response = self.apply_style_signature(response)
|
|
|
|
# Generate Suggestion Bar
|
|
suggestions = self.shadow_engine.get_all_suggestions(user_message, response)
|
|
if suggestions:
|
|
bar = "\n\nPROACTIVE: > " + " ".join([f"{i+1}. {s}" for i, s in enumerate(suggestions)])
|
|
response += bar
|
|
|
|
self.save_message("assistant", response)
|
|
self.context_messages.append({"role": "assistant", "content": response, "timestamp": datetime.now().isoformat()})
|
|
|
|
return response
|
|
|
|
def get_sessions(self, limit=20):
|
|
"""Retrieve recent sessions from DB"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT session_id, started_at, title FROM sessions ORDER BY started_at DESC LIMIT ?", (limit,))
|
|
rows = cursor.fetchall()
|
|
conn.close()
|
|
return [{"id": r[0], "date": r[1], "title": r[2]} for r in rows]
|
|
|
|
def rename_session(self, session_id, new_title):
|
|
"""Rename a session"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute("UPDATE sessions SET title = ? WHERE session_id = ?", (new_title, session_id))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def delete_session(self, session_id):
|
|
"""Delete a session and its messages"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM sessions WHERE session_id = ?", (session_id,))
|
|
cursor.execute("DELETE FROM messages WHERE session_id = ?", (session_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def load_session(self, session_id):
|
|
"""Load a specific session context"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT role, content, timestamp FROM messages WHERE session_id = ? ORDER BY id ASC", (session_id,))
|
|
rows = cursor.fetchall()
|
|
conn.close()
|
|
|
|
self.session_id = session_id
|
|
self.context_messages = []
|
|
loaded_history = []
|
|
for role, content, ts in rows:
|
|
msg = {"role": role, "content": content, "timestamp": ts}
|
|
self.context_messages.append(msg)
|
|
loaded_history.append(msg)
|
|
return loaded_history
|
|
|
|
def start_new_session(self):
|
|
"""Reset context and start new session"""
|
|
self.session_id = self.create_session()
|
|
self.context_messages = []
|
|
return self.session_id
|
|
|
|
def run(self):
|
|
"""Main loop"""
|
|
try:
|
|
force_model = None
|
|
while True:
|
|
user_input = input("\nJames: ").strip()
|
|
if not user_input:
|
|
continue
|
|
if user_input.lower() in ['exit', 'quit']:
|
|
print("\n👋 Later!")
|
|
self.end_session()
|
|
break
|
|
if user_input.startswith('/'):
|
|
cmd = user_input.lower()
|
|
if cmd == '/fast':
|
|
force_model = "fast"
|
|
print("⚡ Next: FAST model")
|
|
continue
|
|
elif cmd == '/balanced':
|
|
force_model = "balanced"
|
|
print("⚖️ Next: BALANCED model")
|
|
continue
|
|
elif cmd == '/help':
|
|
print("\n💡 Commands:")
|
|
print("/fast - Use fast model")
|
|
print("/balanced - Use balanced model")
|
|
print("/index <path> - Index local repositories")
|
|
print("/scan - Scan style signature (V3.0)")
|
|
print("/help - This message")
|
|
print("exit - End session\n")
|
|
continue
|
|
elif cmd.startswith('/index'):
|
|
parts = user_input.split(maxsplit=1)
|
|
if len(parts) > 1:
|
|
self.index_local_repositories(parts[1])
|
|
else:
|
|
print("Usage: /index <path_to_repos>")
|
|
continue
|
|
elif cmd == '/scan':
|
|
self.scan_style_signature()
|
|
continue
|
|
else:
|
|
print("\nUnknown command. Type /help")
|
|
continue
|
|
# Chat
|
|
response = self.chat(user_input, force_model)
|
|
print(f"\nBuddAI:\n{response}\n")
|
|
force_model = None
|
|
except KeyboardInterrupt:
|
|
print("\n\n👋 Bye!")
|
|
self.end_session()
|
|
|
|
|
|
# --- Server Implementation ---
|
|
if SERVER_AVAILABLE:
|
|
app = FastAPI(title="BuddAI API", version="3.1")
|
|
|
|
# Allow React frontend to communicate
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
class ChatRequest(BaseModel):
|
|
message: str
|
|
model: Optional[str] = None
|
|
forge_mode: Optional[str] = "2"
|
|
|
|
class SessionLoadRequest(BaseModel):
|
|
session_id: str
|
|
|
|
class SessionRenameRequest(BaseModel):
|
|
session_id: str
|
|
title: str
|
|
|
|
class SessionDeleteRequest(BaseModel):
|
|
session_id: str
|
|
|
|
# Initialize server instance
|
|
server_buddai = BuddAI(server_mode=True)
|
|
|
|
# Serve Frontend
|
|
frontend_path = Path(__file__).parent / "frontend"
|
|
frontend_path.mkdir(exist_ok=True)
|
|
app.mount("/web", StaticFiles(directory=frontend_path, html=True), name="web")
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def root():
|
|
status = server_buddai.get_user_status()
|
|
return f"""
|
|
<html>
|
|
<head>
|
|
<title>BuddAI API</title>
|
|
<link rel="icon" href="/favicon.ico">
|
|
<style>
|
|
body {{ background-color: #111; color: #fff; font-family: monospace; display: flex; flex-direction: column; align-items: center; justify-content: center; height: 100vh; margin: 0; }}
|
|
img {{ width: 150px; margin-bottom: 1rem; }}
|
|
a {{ color: #f39c12; }}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<img src="/favicon.ico" alt="BuddAI">
|
|
<h1>BuddAI API Online</h1>
|
|
<p>Current Mode: <strong>{status}</strong></p>
|
|
<p>Visit <a href="/web">/web</a> or <a href="/docs">/docs</a></p>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
@app.get("/favicon.ico", include_in_schema=False)
|
|
async def favicon():
|
|
return FileResponse(Path(__file__).parent / "icons" / "icon.png")
|
|
|
|
@app.get("/favicon-16x16.png", include_in_schema=False)
|
|
async def favicon_16():
|
|
return FileResponse(Path(__file__).parent / "icons" / "favicon-16x16.png")
|
|
|
|
@app.get("/favicon-32x32.png", include_in_schema=False)
|
|
async def favicon_32():
|
|
return FileResponse(Path(__file__).parent / "icons" / "favicon-32x32.png")
|
|
|
|
@app.get("/favicon-192x192.png", include_in_schema=False)
|
|
async def favicon_192():
|
|
return FileResponse(Path(__file__).parent / "icons" / "favicon-192x192.png")
|
|
|
|
def validate_upload(file: UploadFile):
|
|
# Check size
|
|
file.file.seek(0, 2)
|
|
size = file.file.tell()
|
|
file.file.seek(0)
|
|
|
|
if size > MAX_FILE_SIZE:
|
|
raise ValueError(f"File too large (Limit: {MAX_FILE_SIZE//1024//1024}MB)")
|
|
|
|
if file.content_type not in ALLOWED_TYPES:
|
|
# Fallback: check extension if content_type is generic
|
|
ext = Path(file.filename).suffix.lower()
|
|
if ext not in ['.zip', '.py', '.ino', '.cpp', '.h', '.js', '.jsx', '.html', '.css']:
|
|
raise ValueError("Invalid file type")
|
|
# Scan for malicious content
|
|
return True
|
|
|
|
@app.post("/api/chat")
|
|
async def chat_endpoint(request: ChatRequest):
|
|
response = server_buddai.chat(request.message, force_model=request.model, forge_mode=request.forge_mode)
|
|
return {"response": response}
|
|
|
|
@app.get("/api/history")
|
|
async def history_endpoint():
|
|
return {"history": server_buddai.context_messages}
|
|
|
|
@app.get("/api/sessions")
|
|
async def sessions_endpoint():
|
|
return {"sessions": server_buddai.get_sessions()}
|
|
|
|
@app.post("/api/session/load")
|
|
async def load_session_endpoint(req: SessionLoadRequest):
|
|
history = server_buddai.load_session(req.session_id)
|
|
return {"history": history, "session_id": req.session_id}
|
|
|
|
@app.post("/api/session/rename")
|
|
async def rename_session_endpoint(req: SessionRenameRequest):
|
|
server_buddai.rename_session(req.session_id, req.title)
|
|
return {"status": "success"}
|
|
|
|
@app.post("/api/session/delete")
|
|
async def delete_session_endpoint(req: SessionDeleteRequest):
|
|
server_buddai.delete_session(req.session_id)
|
|
return {"status": "success"}
|
|
|
|
@app.post("/api/session/new")
|
|
async def new_session_endpoint():
|
|
new_id = server_buddai.start_new_session()
|
|
return {"session_id": new_id}
|
|
|
|
@app.post("/api/upload")
|
|
async def upload_repo(file: UploadFile = File(...)):
|
|
try:
|
|
validate_upload(file)
|
|
|
|
uploads_dir = DATA_DIR / "uploads"
|
|
uploads_dir.mkdir(exist_ok=True)
|
|
|
|
file_location = uploads_dir / file.filename
|
|
with open(file_location, "wb") as buffer:
|
|
shutil.copyfileobj(file.file, buffer)
|
|
|
|
if file.filename.endswith(".zip"):
|
|
extract_path = uploads_dir / file_location.stem
|
|
with zipfile.ZipFile(file_location, 'r') as zip_ref:
|
|
zip_ref.extractall(extract_path)
|
|
server_buddai.index_local_repositories(extract_path)
|
|
file_location.unlink() # Cleanup zip
|
|
return {"message": f"✅ Successfully indexed {file.filename}"}
|
|
else:
|
|
# Support single code files by moving them to a folder and indexing
|
|
if file_location.suffix in ['.py', '.ino', '.cpp', '.h', '.js', '.jsx', '.html', '.css']:
|
|
target_dir = uploads_dir / file_location.stem
|
|
target_dir.mkdir(exist_ok=True)
|
|
final_path = target_dir / file.filename
|
|
shutil.move(str(file_location), str(final_path))
|
|
server_buddai.index_local_repositories(target_dir)
|
|
return {"message": f"✅ Successfully indexed {file.filename}"}
|
|
|
|
return {"message": f"✅ Successfully uploaded {file.filename}"}
|
|
except Exception as e:
|
|
return {"message": f"❌ Error: {str(e)}"}
|
|
|
|
def check_ollama():
|
|
try:
|
|
conn = http.client.HTTPConnection(OLLAMA_HOST, OLLAMA_PORT, timeout=5)
|
|
conn.request("GET", "/api/tags")
|
|
response = conn.getresponse()
|
|
conn.close()
|
|
return response.status == 200
|
|
except:
|
|
return False
|
|
|
|
|
|
def main():
|
|
if not check_ollama():
|
|
print("❌ Ollama not running. Start: ollama serve")
|
|
sys.exit(1)
|
|
|
|
if len(sys.argv) > 1 and sys.argv[1] == "--server":
|
|
if SERVER_AVAILABLE:
|
|
print("🚀 Starting BuddAI API Server on port 8000...")
|
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|
|
else:
|
|
print("❌ Server dependencies missing. Install: pip install fastapi uvicorn aiofiles python-multipart")
|
|
else:
|
|
buddai = BuddAI()
|
|
buddai.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |