mirror of
https://github.com/JamesTheGiblet/BuddAI.git
synced 2026-01-08 21:58:40 +00:00
Add unit tests for BuddAI v3.2 to verify type hints and routing logic
- Implemented tests for method annotations to ensure type hints are present. - Added tests for routing logic to validate behavior for simple questions, complex requests, search queries, and forced model scenarios. - Verified module extraction logic with specific test cases. - Mocked database interactions and suppressed print statements during tests.
This commit is contained in:
parent
b8a34be03f
commit
c214b4a8ea
6 changed files with 1802 additions and 23 deletions
BIN
__pycache__/buddai_v3.2.cpython-313.pyc
Normal file
BIN
__pycache__/buddai_v3.2.cpython-313.pyc
Normal file
Binary file not shown.
1305
buddai_v3.2.py
Normal file
1305
buddai_v3.2.py
Normal file
File diff suppressed because it is too large
Load diff
|
|
@ -214,6 +214,7 @@
|
|||
const [isSidebarOpen, setIsSidebarOpen] = useState(false);
|
||||
const endRef = useRef(null);
|
||||
const abortControllerRef = useRef(null);
|
||||
const socketRef = useRef(null);
|
||||
|
||||
const scrollToBottom = () => {
|
||||
endRef.current?.scrollIntoView({ behavior: "smooth" });
|
||||
|
|
@ -266,6 +267,17 @@
|
|||
return () => clearInterval(timer);
|
||||
}, []);
|
||||
|
||||
useEffect(() => {
|
||||
// Initialize WebSocket
|
||||
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
|
||||
const wsUrl = `${protocol}//${window.location.host}/api/ws/chat`;
|
||||
socketRef.current = new WebSocket(wsUrl);
|
||||
|
||||
return () => {
|
||||
if (socketRef.current) socketRef.current.close();
|
||||
};
|
||||
}, []);
|
||||
|
||||
const handleRename = async (e) => {
|
||||
if (e.key === 'Enter') {
|
||||
await fetch("/api/session/rename", {
|
||||
|
|
@ -327,30 +339,49 @@
|
|||
if (!textOverride) setInput("");
|
||||
setLoading(true);
|
||||
|
||||
// Cancel previous request if any
|
||||
if (abortControllerRef.current) abortControllerRef.current.abort();
|
||||
const controller = new AbortController();
|
||||
abortControllerRef.current = controller;
|
||||
// Use WebSocket if available
|
||||
if (socketRef.current && socketRef.current.readyState === WebSocket.OPEN) {
|
||||
// Add placeholder for streaming response
|
||||
setHistory(prev => [...prev, { role: "assistant", content: "" }]);
|
||||
|
||||
socketRef.current.send(JSON.stringify({
|
||||
message: msgText,
|
||||
forge_mode: forgeMode,
|
||||
user_id: "default" // In a real app, get from auth context
|
||||
}));
|
||||
|
||||
try {
|
||||
const res = await fetch("/api/chat", {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({ message: msgText, forge_mode: forgeMode }),
|
||||
signal: controller.signal
|
||||
});
|
||||
const data = await res.json();
|
||||
setHistory(prev => [...prev, { role: "assistant", content: data.response }]);
|
||||
if (!currentSessionId) fetchSessions(); // Refresh list if this was first msg
|
||||
} catch (err) {
|
||||
if (err.name === 'AbortError') {
|
||||
setHistory(prev => [...prev, { role: "assistant", content: "🛑 *Generation stopped by user.*" }]);
|
||||
} else {
|
||||
socketRef.current.onmessage = (event) => {
|
||||
const data = JSON.parse(event.data);
|
||||
if (data.type === 'token') {
|
||||
setHistory(prev => {
|
||||
const newHistory = [...prev];
|
||||
const lastMsg = newHistory[newHistory.length - 1];
|
||||
if (lastMsg.role === 'assistant') {
|
||||
lastMsg.content += data.content;
|
||||
}
|
||||
return newHistory;
|
||||
});
|
||||
} else if (data.type === 'end') {
|
||||
setLoading(false);
|
||||
if (!currentSessionId) fetchSessions();
|
||||
}
|
||||
};
|
||||
} else {
|
||||
// Fallback to HTTP
|
||||
try {
|
||||
const res = await fetch("/api/chat", {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({ message: msgText, forge_mode: forgeMode })
|
||||
});
|
||||
const data = await res.json();
|
||||
setHistory(prev => [...prev, { role: "assistant", content: data.response }]);
|
||||
if (!currentSessionId) fetchSessions();
|
||||
} catch (err) {
|
||||
setHistory(prev => [...prev, { role: "assistant", content: "Error connecting to BuddAI server." }]);
|
||||
}
|
||||
setLoading(false);
|
||||
}
|
||||
setLoading(false);
|
||||
abortControllerRef.current = null;
|
||||
};
|
||||
|
||||
const stopGeneration = () => {
|
||||
|
|
@ -394,7 +425,7 @@
|
|||
<div className="header">
|
||||
<div style={{display:'flex', alignItems:'center'}}>
|
||||
<img src="/favicon.ico" alt="BuddAI" style={{height: '24px', marginRight: '10px'}} />
|
||||
<h3 style={{margin:0}}>BuddAI v3</h3>
|
||||
<h3 style={{margin:0}}>BuddAI v3.2</h3>
|
||||
<span className={`status-badge ${status}`}>{status}</span>
|
||||
</div>
|
||||
<div style={{display:'flex', gap:'10px'}}>
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
BuddAI v3.1 Test Suite
|
||||
BuddAI v3.2 Test Suite
|
||||
Comprehensive testing for all features
|
||||
|
||||
Author: James Gilbert
|
||||
|
|
@ -8,11 +8,25 @@ License: MIT
|
|||
"""
|
||||
|
||||
import sys
|
||||
import importlib.util
|
||||
from unittest.mock import MagicMock, patch
|
||||
import sqlite3
|
||||
import tempfile
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
import os
|
||||
import io
|
||||
import zipfile
|
||||
|
||||
# Dynamic import of buddai_v3.2.py
|
||||
REPO_ROOT = Path(__file__).parent.parent
|
||||
MODULE_PATH = REPO_ROOT / "buddai_v3.2.py"
|
||||
spec = importlib.util.spec_from_file_location("buddai_v3_2", MODULE_PATH)
|
||||
buddai_module = importlib.util.module_from_spec(spec)
|
||||
sys.modules["buddai_v3_2"] = buddai_module
|
||||
spec.loader.exec_module(buddai_module)
|
||||
BuddAI = buddai_module.BuddAI
|
||||
|
||||
# Test utilities
|
||||
class TestColors:
|
||||
|
|
@ -544,10 +558,318 @@ def test_context_window():
|
|||
return False
|
||||
|
||||
|
||||
# Test 12: Schedule Awareness (New)
|
||||
def test_schedule_awareness():
|
||||
print_test("Schedule Awareness")
|
||||
|
||||
# Mock datetime to test different times
|
||||
with patch('buddai_v3_2.datetime') as mock_date:
|
||||
# 1. Early Morning (Monday 6:00 AM)
|
||||
mock_date.now.return_value = datetime(2025, 12, 29, 6, 0, 0)
|
||||
|
||||
buddai = BuddAI(server_mode=False)
|
||||
status = buddai.get_user_status()
|
||||
|
||||
if "Early Morning" in status:
|
||||
print_pass(f"6:00 AM Mon -> {status}")
|
||||
else:
|
||||
print_fail(f"Failed Morning check: {status}")
|
||||
return False
|
||||
|
||||
# 2. Work Hours (Monday 10:00 AM)
|
||||
mock_date.now.return_value = datetime(2025, 12, 29, 10, 0, 0)
|
||||
status = buddai.get_user_status()
|
||||
|
||||
if "Work Hours" in status:
|
||||
print_pass(f"10:00 AM Mon -> {status}")
|
||||
else:
|
||||
print_fail(f"Failed Work check: {status}")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
# Test 13: Modular Plan Generation (New)
|
||||
def test_modular_plan():
|
||||
print_test("Modular Plan Generation")
|
||||
|
||||
buddai = BuddAI(server_mode=False)
|
||||
modules = ["ble", "servo"]
|
||||
plan = buddai.build_modular_plan(modules)
|
||||
|
||||
# Expect 3 steps: ble, servo, integration
|
||||
if len(plan) == 3:
|
||||
tasks = [p['module'] for p in plan]
|
||||
if "integration" in tasks and "ble" in tasks:
|
||||
print_pass(f"Generated {len(plan)} steps including Integration")
|
||||
return True
|
||||
|
||||
print_fail(f"Plan generation failed. Got {len(plan)} steps: {plan}")
|
||||
return False
|
||||
|
||||
|
||||
# Test 14: Session Management (New)
|
||||
def test_session_management():
|
||||
print_test("Session Management (CRUD)")
|
||||
|
||||
# Use a named temporary file to handle Windows file locking better
|
||||
fd, test_db_path = tempfile.mkstemp(suffix=".db")
|
||||
os.close(fd)
|
||||
test_db = Path(test_db_path)
|
||||
|
||||
try:
|
||||
with patch('buddai_v3_2.DB_PATH', test_db):
|
||||
buddai = BuddAI(server_mode=False)
|
||||
|
||||
# 1. Create
|
||||
sid = buddai.start_new_session()
|
||||
print_pass(f"Created session: {sid}")
|
||||
|
||||
# 2. Rename
|
||||
buddai.rename_session(sid, "Unit Test Session")
|
||||
sessions = buddai.get_sessions(limit=1)
|
||||
if not sessions or sessions[0]['title'] != "Unit Test Session":
|
||||
print_fail("Rename failed")
|
||||
return False
|
||||
print_pass("Renamed session successfully")
|
||||
|
||||
# 3. Delete
|
||||
buddai.delete_session(sid)
|
||||
sessions = buddai.get_sessions(limit=5)
|
||||
if any(s['id'] == sid for s in sessions):
|
||||
print_fail("Delete failed - session still exists")
|
||||
return False
|
||||
print_pass("Deleted session successfully")
|
||||
finally:
|
||||
# Manual cleanup with error suppression for Windows locks
|
||||
try:
|
||||
if test_db.exists():
|
||||
os.unlink(test_db)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return True
|
||||
|
||||
|
||||
# Test 15: Rapid Session Creation (Collision Handling)
|
||||
def test_rapid_session_creation():
|
||||
print_test("Rapid Session Creation (Collision Handling)")
|
||||
|
||||
# Use a named temporary file to handle Windows file locking better
|
||||
fd, test_db_path = tempfile.mkstemp(suffix=".db")
|
||||
os.close(fd)
|
||||
test_db = Path(test_db_path)
|
||||
|
||||
try:
|
||||
# Mock datetime to return a fixed time, forcing ID collisions
|
||||
fixed_time = datetime(2025, 1, 1, 12, 0, 0)
|
||||
|
||||
with patch('buddai_v3_2.DB_PATH', test_db):
|
||||
with patch('buddai_v3_2.datetime') as mock_dt:
|
||||
mock_dt.now.return_value = fixed_time
|
||||
|
||||
buddai = BuddAI(server_mode=False)
|
||||
|
||||
ids = [buddai.session_id] # Capture session from __init__
|
||||
|
||||
# Create 4 more sessions rapidly
|
||||
for _ in range(4):
|
||||
ids.append(buddai.start_new_session())
|
||||
|
||||
# Verify format
|
||||
base_id = fixed_time.strftime("%Y%m%d_%H%M%S")
|
||||
expected = [base_id] + [f"{base_id}_{i}" for i in range(1, 5)]
|
||||
|
||||
if ids == expected:
|
||||
print_pass(f"Generated unique IDs with suffixes: {ids}")
|
||||
return True
|
||||
else:
|
||||
print_fail(f"Unexpected ID format. Expected {expected}, got {ids}")
|
||||
return False
|
||||
finally:
|
||||
try:
|
||||
if test_db.exists():
|
||||
os.unlink(test_db)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Test 16: Repository Isolation (Multi-User)
|
||||
def test_repo_isolation():
|
||||
print_test("Repository Isolation (Multi-User)")
|
||||
|
||||
# Use a named temporary file for DB
|
||||
fd, test_db_path = tempfile.mkstemp(suffix=".db")
|
||||
os.close(fd)
|
||||
test_db = Path(test_db_path)
|
||||
|
||||
# Create a temp directory for repo
|
||||
with tempfile.TemporaryDirectory() as tmp_repo:
|
||||
repo_path = Path(tmp_repo)
|
||||
|
||||
# Create a unique file for User 1
|
||||
(repo_path / "user1_secret.py").write_text("def user1_secret_function():\n pass")
|
||||
|
||||
try:
|
||||
with patch('buddai_v3_2.DB_PATH', test_db):
|
||||
# Suppress internal prints to keep test output clean
|
||||
with patch('builtins.print'):
|
||||
# User 1 indexes the repo
|
||||
buddai1 = BuddAI(user_id="user1", server_mode=False)
|
||||
buddai1.index_local_repositories(str(repo_path))
|
||||
|
||||
# User 2 instance
|
||||
buddai2 = BuddAI(user_id="user2", server_mode=False)
|
||||
|
||||
# User 1 searches
|
||||
res1 = buddai1.search_repositories("user1_secret_function")
|
||||
|
||||
# User 2 searches
|
||||
res2 = buddai2.search_repositories("user1_secret_function")
|
||||
|
||||
# Verify User 1 found it
|
||||
if "Found 1 matches" in res1 or "user1_secret_function" in res1:
|
||||
print_pass("User 1 found their indexed code")
|
||||
else:
|
||||
print_fail(f"User 1 failed to find code: {res1}")
|
||||
return False
|
||||
|
||||
# Verify User 2 did NOT find it
|
||||
if "No functions found" in res2:
|
||||
print_pass("User 2 could not see User 1's code")
|
||||
else:
|
||||
print_fail(f"User 2 saw restricted code: {res2}")
|
||||
return False
|
||||
|
||||
finally:
|
||||
try:
|
||||
if test_db.exists():
|
||||
os.unlink(test_db)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return True
|
||||
|
||||
# Test 17: Upload Security (Hardening)
|
||||
def test_upload_security():
|
||||
print_test("Upload Security (Hardening)")
|
||||
|
||||
# 1. Test Magic Byte Check
|
||||
# We need to mock UploadFile since it's a FastAPI class
|
||||
class MockUploadFile:
|
||||
def __init__(self, filename, content):
|
||||
self.filename = filename
|
||||
self.file = io.BytesIO(content)
|
||||
self.content_type = "application/zip"
|
||||
|
||||
if hasattr(buddai_module, 'validate_upload'):
|
||||
# Create a fake zip (text file renamed)
|
||||
fake_zip = MockUploadFile("fake.zip", b"This is not a zip file")
|
||||
try:
|
||||
buddai_module.validate_upload(fake_zip)
|
||||
print_fail("Magic byte check failed (accepted invalid zip)")
|
||||
return False
|
||||
except ValueError as e:
|
||||
if "Invalid ZIP file header" in str(e):
|
||||
print_pass("Magic byte check rejected invalid zip header")
|
||||
else:
|
||||
print_fail(f"Unexpected error: {e}")
|
||||
return False
|
||||
else:
|
||||
print_warn("Skipping magic byte check (validate_upload not available)")
|
||||
|
||||
# 2. Test Zip Slip Protection
|
||||
if hasattr(buddai_module, 'safe_extract_zip'):
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
malicious_zip_path = Path(tmpdir) / "slip.zip"
|
||||
extract_dir = Path(tmpdir) / "extract"
|
||||
extract_dir.mkdir()
|
||||
|
||||
# Create a zip file with a member pointing to parent directory
|
||||
# We use zipfile to craft this manually
|
||||
zip_buffer = io.BytesIO()
|
||||
with zipfile.ZipFile(zip_buffer, 'w') as zf:
|
||||
zf.writestr('../evil.txt', 'malicious content')
|
||||
|
||||
malicious_zip_path.write_bytes(zip_buffer.getvalue())
|
||||
|
||||
try:
|
||||
buddai_module.safe_extract_zip(malicious_zip_path, extract_dir)
|
||||
print_fail("Zip Slip protection failed (extracted malicious file)")
|
||||
return False
|
||||
except ValueError as e:
|
||||
if "Malicious zip member" in str(e):
|
||||
print_pass("Zip Slip protection caught directory traversal")
|
||||
else:
|
||||
print_fail(f"Unexpected error during extraction: {e}")
|
||||
return False
|
||||
return True
|
||||
|
||||
# Test 18: WebSocket Logic (Streaming)
|
||||
def test_websocket_logic():
|
||||
print_test("WebSocket Logic (Streaming)")
|
||||
|
||||
# Use a named temporary file for DB
|
||||
fd, test_db_path = tempfile.mkstemp(suffix=".db")
|
||||
os.close(fd)
|
||||
test_db = Path(test_db_path)
|
||||
|
||||
try:
|
||||
with patch('buddai_v3_2.DB_PATH', test_db):
|
||||
# Suppress prints during init
|
||||
with patch('builtins.print'):
|
||||
buddai = BuddAI(server_mode=False)
|
||||
|
||||
# Mock call_model to return a generator
|
||||
def mock_generator(*args, **kwargs):
|
||||
yield "Stream"
|
||||
yield "ing"
|
||||
yield "..."
|
||||
|
||||
with patch.object(buddai, 'call_model', side_effect=mock_generator) as mock_call:
|
||||
# Mock shadow engine to avoid DB lookups or side effects affecting output
|
||||
with patch.object(buddai.shadow_engine, 'get_all_suggestions', return_value=[]):
|
||||
|
||||
# Execute
|
||||
stream = buddai.chat_stream("Test Message", force_model="fast")
|
||||
chunks = list(stream)
|
||||
full_text = "".join(chunks)
|
||||
|
||||
# Verify 1: Content
|
||||
if full_text == "Streaming...":
|
||||
print_pass("Streamed content matches expected output")
|
||||
else:
|
||||
print_fail(f"Stream content mismatch. Got: '{full_text}'")
|
||||
return False
|
||||
|
||||
# Verify 2: Stream flag passed to model
|
||||
args, kwargs = mock_call.call_args
|
||||
if kwargs.get('stream') is True:
|
||||
print_pass("call_model invoked with stream=True")
|
||||
else:
|
||||
print_fail(f"call_model arguments incorrect: {kwargs}")
|
||||
return False
|
||||
|
||||
# Verify 3: Context saved
|
||||
last_msg = buddai.context_messages[-1]
|
||||
if last_msg['role'] == 'assistant' and last_msg['content'] == "Streaming...":
|
||||
print_pass("Conversation context updated correctly")
|
||||
else:
|
||||
print_fail("Context update failed")
|
||||
return False
|
||||
|
||||
finally:
|
||||
try:
|
||||
if test_db.exists():
|
||||
os.unlink(test_db)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return True
|
||||
|
||||
# Main Test Runner
|
||||
def run_all_tests():
|
||||
print("\n" + "="*60)
|
||||
print("🔥 BuddAI v3.1 Comprehensive Test Suite")
|
||||
print("🔥 BuddAI v3.2 Comprehensive Test Suite")
|
||||
print("="*60)
|
||||
|
||||
tests = [
|
||||
|
|
@ -562,6 +884,13 @@ def run_all_tests():
|
|||
("Repository Indexing", test_repository_indexing),
|
||||
("Search Query Safety", test_search_query_safety),
|
||||
("Context Window", test_context_window),
|
||||
("Schedule Awareness", test_schedule_awareness),
|
||||
("Modular Plan", test_modular_plan),
|
||||
("Session Management", test_session_management),
|
||||
("Rapid Session Creation", test_rapid_session_creation),
|
||||
("Repository Isolation", test_repo_isolation),
|
||||
("Upload Security", test_upload_security),
|
||||
("WebSocket Logic", test_websocket_logic),
|
||||
]
|
||||
|
||||
results = []
|
||||
|
|
|
|||
114
tests/test_buddai_v3_2.py
Normal file
114
tests/test_buddai_v3_2.py
Normal file
|
|
@ -0,0 +1,114 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Unit tests for BuddAI v3.2
|
||||
Verifies type hints and core functionality including the new routing logic.
|
||||
"""
|
||||
|
||||
import unittest
|
||||
import sys
|
||||
import importlib.util
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Optional
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
# Load buddai_v3.2.py dynamically due to version number in filename
|
||||
REPO_ROOT = Path(__file__).parent.parent
|
||||
MODULE_PATH = REPO_ROOT / "buddai_v3.2.py"
|
||||
|
||||
if not MODULE_PATH.exists():
|
||||
print(f"Error: Could not find {MODULE_PATH}")
|
||||
sys.exit(1)
|
||||
|
||||
spec = importlib.util.spec_from_file_location("buddai_v3_2", MODULE_PATH)
|
||||
buddai_module = importlib.util.module_from_spec(spec)
|
||||
sys.modules["buddai_v3_2"] = buddai_module
|
||||
spec.loader.exec_module(buddai_module)
|
||||
|
||||
BuddAI = buddai_module.BuddAI
|
||||
|
||||
class TestBuddAITypesAndLogic(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
# Suppress print statements during tests
|
||||
self.original_stdout = sys.stdout
|
||||
sys.stdout = MagicMock()
|
||||
|
||||
# Initialize BuddAI in non-server mode, mocking DB interactions
|
||||
with patch('sqlite3.connect') as mock_sql:
|
||||
# Mock mkdir to prevent creating directories during tests
|
||||
with patch('pathlib.Path.mkdir'):
|
||||
self.buddai = BuddAI(server_mode=False)
|
||||
self.mock_conn = mock_sql.return_value
|
||||
self.mock_cursor = self.mock_conn.cursor.return_value
|
||||
|
||||
def tearDown(self):
|
||||
sys.stdout = self.original_stdout
|
||||
|
||||
def test_method_annotations(self):
|
||||
"""Verify type hints exist on key methods"""
|
||||
# chat
|
||||
chat_hints = BuddAI.chat.__annotations__
|
||||
self.assertEqual(chat_hints['user_message'], str)
|
||||
self.assertEqual(chat_hints['return'], str)
|
||||
|
||||
# is_complex
|
||||
self.assertEqual(BuddAI.is_complex.__annotations__['return'], bool)
|
||||
|
||||
# extract_modules
|
||||
self.assertEqual(BuddAI.extract_modules.__annotations__['return'], List[str])
|
||||
|
||||
# build_modular_plan
|
||||
self.assertEqual(BuddAI.build_modular_plan.__annotations__['return'], List[Dict[str, str]])
|
||||
|
||||
def test_routing_simple_question(self):
|
||||
"""Test that simple questions route to the FAST model"""
|
||||
with patch.object(self.buddai, 'call_model', return_value="Fast response") as mock_call:
|
||||
response = self.buddai._route_request("What is a servo?", force_model=None, forge_mode="2")
|
||||
|
||||
mock_call.assert_called_with("fast", "What is a servo?")
|
||||
self.assertEqual(response, "Fast response")
|
||||
|
||||
def test_routing_complex_request(self):
|
||||
"""Test that complex requests route to modular build"""
|
||||
complex_msg = "Build a complete robot with servo and motor"
|
||||
|
||||
with patch.object(self.buddai, 'execute_modular_build', return_value="Modular code") as mock_build:
|
||||
# Mock is_complex to ensure it returns True for this test case
|
||||
with patch.object(self.buddai, 'is_complex', return_value=True):
|
||||
response = self.buddai._route_request(complex_msg, force_model=None, forge_mode="2")
|
||||
|
||||
mock_build.assert_called()
|
||||
self.assertEqual(response, "Modular code")
|
||||
|
||||
def test_routing_search_query(self):
|
||||
"""Test that search queries route to repository search"""
|
||||
search_msg = "Show me functions using applyForge"
|
||||
|
||||
with patch.object(self.buddai, 'search_repositories', return_value="Search results") as mock_search:
|
||||
# Mock is_search_query to ensure True
|
||||
with patch.object(self.buddai, 'is_search_query', return_value=True):
|
||||
# Ensure is_complex is False so it doesn't preempt search
|
||||
with patch.object(self.buddai, 'is_complex', return_value=False):
|
||||
response = self.buddai._route_request(search_msg, force_model=None, forge_mode="2")
|
||||
|
||||
mock_search.assert_called_with(search_msg)
|
||||
self.assertEqual(response, "Search results")
|
||||
|
||||
def test_routing_forced_model(self):
|
||||
"""Test that force_model overrides other logic"""
|
||||
with patch.object(self.buddai, 'call_model', return_value="Forced response") as mock_call:
|
||||
response = self.buddai._route_request("Complex build request", force_model="balanced", forge_mode="2")
|
||||
|
||||
mock_call.assert_called_with("balanced", "Complex build request")
|
||||
self.assertEqual(response, "Forced response")
|
||||
|
||||
def test_extract_modules(self):
|
||||
"""Verify module extraction logic"""
|
||||
msg = "I need a robot with bluetooth and a flipper weapon"
|
||||
modules = self.buddai.extract_modules(msg)
|
||||
self.assertIn("ble", modules)
|
||||
self.assertIn("servo", modules)
|
||||
self.assertNotIn("motor", modules)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue