From 03d87ec1745503db012234a6fcb8a59c7ed635ba Mon Sep 17 00:00:00 2001 From: JamesTheGiblet Date: Mon, 29 Dec 2025 15:45:31 +0000 Subject: [PATCH] Add unit tests for BuddAI v3.2 to verify type hints and routing logic - Implemented tests for method annotations to ensure type hints are present. - Added tests for routing logic to validate behavior for simple questions, complex requests, search queries, and forced model scenarios. - Verified module extraction logic with specific test cases. - Mocked database interactions and suppressed print statements during tests. --- __pycache__/buddai_v3.2.cpython-313.pyc | Bin 0 -> 67348 bytes buddai_v3.1.py => archive/buddai_v3.1.py | 0 buddai_v3.2.py | 1305 ++++++++++++++++++++++ data/conversations.db | Bin 55033856 -> 55033856 bytes frontend/index.html | 73 +- tests/test_buddai.py | 333 +++++- tests/test_buddai_v3_2.py | 114 ++ 7 files changed, 1802 insertions(+), 23 deletions(-) create mode 100644 __pycache__/buddai_v3.2.cpython-313.pyc rename buddai_v3.1.py => archive/buddai_v3.1.py (100%) create mode 100644 buddai_v3.2.py create mode 100644 tests/test_buddai_v3_2.py diff --git a/__pycache__/buddai_v3.2.cpython-313.pyc b/__pycache__/buddai_v3.2.cpython-313.pyc new file mode 100644 index 0000000000000000000000000000000000000000..6e1d2dec21d9d42f715255e87512d0c9a00bce41 GIT binary patch literal 67348 zcmeFac~o52nJ;>3f~umRfMOKWVG^f(kz4zJkH-GzHYO0yTmGMfyTL^O8ztMwo8ReUMKS<)Z zmpFkFIYA?6yELMP{c1%m`_+j${OY>&r}Uzpr!@L5!>J@OiQQ|vl0_p+XA({LHFTLz zrHCn~ETZL9s+f8zO-wsw6|JYz#dJA+hM2+9XNsA{R9aG(?NpYSbt+rTK9wWpoXQn* zPvwbur}D*omLs`q+o=MvfZZFr3QrY@MeN?xReY*MEIGxCJbN~Gm7XdS%h zQp5@&RopJ5iB&^MW!zBpUX9e|{(hwsqQ8Q*E`{U7nxPsYo!#vaGQ?UTQ><&?1lt*n zSTAG=*+NdeR%}2_E@F1#H}8xFVLl7Dv9N%Jg)A&OqY;Yhlf+$lTp1^nY~uueuU7h| zxSPe5BCbr0YaFVmmaBA9>-2KmP!{r)3l&0TlUCd#KW&$v_ViCi>#y7m6$n)g8uhQb z31h~-WXG3khRXKpgx@ zd*#ZA2R}x?H?rP7G<2x{DWwh3Um-=<)1=|Kp*{W2L}QgZVee28#`O?V?Q7Bs`-KBd zI+pf1B}MdCXd2>$W~4f}En2^0C^u2sLxM#(jI>8$(za}o_NbipSWMdETcmA~(;kmW z+qy;CRyl23OxiY~eJEQvG1R_CFPz-QJ+2o|2pvNw5pxnToh+t9I5m_cbPXjTrklld z;tMZDdqTMrdX)7rl#5x`t32(|2z^8Qgww3dGgQLk8g)dwP}WVQgy^r(ujCelp}aj> zVSw_fxq7x}qeIDcUk&K%TH$Qm*L$}p=iEce8H_8ZZ;Ku|zhya(4DFYD={*`HqRw3 zo3YYd8jCA=fX!sB;G%fZAzTv1+1=TpbHYUPAjdfYqlV5#_Yeq=Tt=B?e_U2`Zo8dKVle|#kV5{hX?N>rjw|r==6I< zWJo^Y^!Zy(cSg*PnJKSx?1UQ$^(;*4a5>T9i0PbbMDUJIy8IEFdfVpqje0#E*Qg&~ zPiyV#7kb=dV^glnPLb-9BKVzt_b8R)vuh(3y6JcMX1r)d#L_X))2%!qvFW(e=R!Zb zrUp(#OjF)b=hP*y&+q<9EyqRlXr(idj*mU*5`8EXMbwXtxG|Kv_Xt(7rv$9dnVBi~ zD0|ruaL)t^V|5_BI#I2y=& z;fJLdLG1d^On zQ@nTFu8WvPre`r@iIP!arNPyf#r=Pk59R0W-wQhF1wr>8cC)p?43p9#+~jdij0_$RGe^mMz7)% z$s=e{d(12hRf9eHvj$QZu8b2>_LmUuWj*wa)v|3RwWC*$;gP>=C(jaY3 zVzQ7aBmsOT4;cmHkZH&~geRoX8BhlO?MV_$a@rJ@(l}%pN@8#Hf|+e!sX_`$h>f?f z_%s%u%HEmeJV`$?NHV=2c8`t9I3Yg4afF}J^Swd@FL8tEQ}~sxL4HccpgC@WdsWl>s#e^GSK@vI z7&^VrH8mcM@li26&qt26HV--QDc{fpZGuB>-qBf_Kfa-pZhyz@$Pm`+*tDze5^hJ> z*3EvL?qTfPyX$AJMvTM5ZcLEj;Ydm1{;5|IbmN;o3xeml_1xkYEC@bhxrb*ik|cE$ zW-U#oNRsIC&x)Q+jZa28*7#rI<|Xa|!%Xq zE(I;}=pFigQ`}2D5_#m%q;Z0_5i?b}!?zeQw>iB-yGO4e-#mx%Qa8V+C-r*<)W^E@ zd*nA%U>pMnyS?5?es+czU4T`7Oe3sR9#aNO*W=^Iy&~_FXB|H-dZ&?Y#)|?`ZT>KSbW>q{{cY|2{PFW~-*0Od zT9F?>IWB!fQ)xFwl6_B1x&5xjNU{tm*))$Njn0a+StYv|s^MZQ0h^!8KY9s!Zp1J; z<@LGjN#a3tyLgO(76cJJl_H*`+hhQ;*(txz2ZV-``b|7UFSbo#6lQ0Jr9q&c!YUtT zV-eZ4#VXi3%V`wp^B{Ph`I?ETWja^>lxWM3a%G?u4pc4-I(@b z+O@O|tv)qqxe7XN^8>1Lvdfr=eWq1$7ZSj7q&vYkSf;E`{Tf&ZEZi z+(YY-98(9=7IiRfszU|GdFT2SaHn)3ewBr<_8;2clh1Km)OUNL`u6;-6pLOoHh&8^ zIq*1#UMbO8gwoV4=YqPqIP^+CpatlM_)9P~Y5O#S*<%(`9A<2FmPTC##@45m`fD7$ zrrg<611I{~R?Rzk05b^yD&PI;w7UffZOTlI!z8?wJgA9+<1(M`IpzV`t>5UA?peJ&S&)jNw_X%_p!Rob0+|0V2V zR280KZcaXMs3X9mF4$CEPs|d^L_nO9wF{>kch|Kx*7L$8?<_FFPV|Ip2Jf7Hc_%(< zH;FyeJT0)tgOm zj3`7>77xo4M}zJB!8qi@*`-Wi>1ThBhUGO%(Yc>LL*?YW@kxpk}U!(Sll!4H3t#1(gG ze3@wa8%5b|M)R8`d2OlYHxF77M$8gI=b4%HC*E`a4twRJX^&d5?`jh5yULC+ZN(0& zjoGUR!;QO?wAclAV;9D(pV@Ok-7y_j@R2k?ZjBDTP2Gu#g3v)ga|ow zN_w8_*(2{mo1uaBf{GON6V?}ZF%%7Zbo`~1>oGVC{TcCZqdTW33H^}FAkA+Hf;f`; zZTF`ZbbvQH>|**x9pG8k{VDcn6}SQp%nDZL>i*WvZx%T)+_I;0WqnUzP{)c@dFw@a@3w;ib$3Y4!zA+TVqLwnHU75(WB`!z)WCPlXB+Ex@+t+$cK|(XHt(wEJHe~+yT`Bc zWvyP&%QOBmzS>nkQD4hf$)tRh-MqV=KQjx;I|j#Q0FyDNs)&g6`4zb1XACYUF_;wM z(hShkIUX$#+yw0ZJlG~Kr|6RaSe3*C7?ybeRkaF?=H32!^C-Gdy|4{&=v?j|unF#n zrW{>Ms34{|P$_T5fR+%sC%Kml7zBV1>iP+8ziwP3HsQsq+^f2Xwti9x zc#7BvGR6741Y%5wz;kW@6po! z^?QIi!OR0+xw4eGQ1Fhe;wx7^yr;*DPiACkdIQ}u$s^M_@q`U85i`nLnTm-cSY$$E z*G{nw<^BCALcK`kP z4@EyEBcw+>gJ1%Gxb7Yeq;&EgFBpfQ?y3SgRSBm|V0#t#r-T}bY-alE`EyQ?oZzTR z>?=Z(oc^N`4R6l`?DQl;}M2Mf9TgvM*vLo*i3IK5>MKGl2sK-V2C2<#zJ2 zPrOKFJW0U}0v{pHF^Cgxj~$bycHUMKH`4Gs&U9?HobT_jHa~ox17ea}x|Xv&l(RjY zQw{v7peSg}dzWumw)!jel5K!lwP%z z^iF!i`v$J`z%PwlIuAVTgCs6JcWY|%Zcg4@%1?5(&z-ny$$YW*T5qtx9rSpEGfxJ+ zmxGtD1gEYBpL*_B9QU-QL;Jq;v%zswnojKp9G7}ZyAI0p==GyvTg6=4Pi?t3^EQ&W zZH05Eeo|a^-8`SW&R2ao^_A2wS?5#Vtv$F_+Y+j6d8f8DlvO?7xt`0vom;bX_3eh! z?-mrV7N30me7N}JTEWS1!O8VKN50wnTJPMrNXfcz)f90l@WJEX*|F^&4Xa|wDAIHa_LG0ffe|P?Y~=w>1Qvf^sK|E zVvAX0o`Pg zm6NBfuKte=Uc_mP^d?Bm07>NV19m3VGzR+m5`f>oL7^h>Urquy=sAvw0U~A+-CRB4 zg^VF!Z=rjhJ@5mUTwr(ezG;$n5PyKAbh6~)sCb$RupONdWlt98sMx0|xI)2I3IY^7MFGJ^ zkm)poM1m^frzrR&f{00l5aKb4`~*d6d=elq;XwfeG6b*@1c>XlNX^IZ;=e?ZzIPG) z8YDFB!q_;o^VhO=g|c=n*MzeU&zXLjlC^HhU9%L2EX4~~S1ooV*#sXx24u*}Tg%!J z%G$AXBAnGYulaAbtodE)uDJNbJ6ZPkO#l}gX7W8vdt zY6XHNr&cXpsDZ^c?+K?=Eaj~fuQm0AntH-bePMfFOv1-kEt7Yx+k(bzY=0D+(MpoB zCXczsonW=vHSf{3XwM+A0`Za|Rxl4M4A$X@Wq9}raB1W>*5ToCx9IavK?>vXB5CsQ z@R)aWcvvKy4_p+#C?3Y0c%A~<=R_j~#ETK>J8aaYiE$l|k?i4N2)2M14TGUBx<_XH zE+0xG%uK59A-db61caLS2m$*K+>iCW?`yS&l12Rnho2?izthkA#xz6jvTcLI&x*i) z?quC@1qaRr&kqG3bp{8Vq27^D=jgvviVfao$XY^b{4D3Kl)qN=0o`uY>@aj{mK!%X z`dvA2yY=P3{f~C<74nMb%06%y31Wya!Z~7J8OLeB|j^akd+~h$<`j%K*?;sAj#9rxJzXzQy|4NKwU(V zrCAVfW$~#(I`U^wxk3sgcbP0UO|Z!^dLfI&SY=5deLI`Qq@#=+b{8up%w;i|F>=E^ z7GslRvW5&a;3y@Z#b*oKhU7TZFl#7h$Uya^G;+EEmM&MWe<6Fy6AlPPs3G-GF{Fk1 zp?u`qhNl#?f=ME!GWHlrrHs^b!~&#>OCj*Av_hdtC`I0AP0H@CiBOId)Q4uFLN2jL zsFdUM!gf|tu~3DW>Y);;m1-WN0M#-ma6r8(VT#T5G72^F7fQECZ$P`Cmd4suCLDx% zS~=>LsP~|z#!^*4SUR~y=Nf`m`rL7*IPP z9<(Bx^952aUvlFERFE%V8Ur22UvfU_qIj3+;>RG8a|co=3U2^S#{wy?D^Aj<@qt1N z7}+;bFt%&A584}7|JeA+s}lJOT?Nq%A-fn!m4|675;$U)#=PF`8QNKWFKy+K(CgD2pgS8^-$X6nno>jxGt-L{(9_4?6pW>?tS6*P89 zgoJ@LYtuv}%!5tT|B4;!(d>td#4k{d*`-WH4H21E`zqxM(~oUr}n~ zF@Uy80&f4n)Qoz_VtD^U(>cpPn%=IR#3iLfO9KBBeAM_0 z5=)t$#E2La%5kg^yiQdjC|F|BX!{G|%ZTJX(~O*{A~--k7LPiDzw0J#~p_;K3zj z9N!Q_&cWsg&OJd|Q@dSb!jG59|^oY#<8tnM{B* z0-q!M=` zD@F&&&bah}F1csa*R@g2B;3X-FR5BkTl8M_M|x6h3hcX2l?l-gQ(xM(bC%|=+zrZi z;BSkR+e#6IAmk=Q2@Us74e z1UY4q5QaHUBxKhxv}%)R1Dt%1D*+>_PDYw!ge$gr2~fI^@LAtB1d`k*=kPZ=mPQr^ zZas4Ik(YFO;qQ5_gJ5C!(i`ZO@Sq)2~q$ zRj?t6Cq6-`(tI4zu+|@(s_Jo9S~$l4hKu6VjMuSu_F`$rDo|7`xEGglaH^ zbrFplauE;+5tFJIND9xMQAs!g1x*ZGB-Q63MP$hK*kiK%D87!Od^rfvZ}(pce=Nc1 zMz(D|@;{*+Z+tkcz0_;GFSpe!*PY zdP?3x`>n2y~Y6meP==bg_2T(hz;x4sP6fO6I)l z#^m+M&rOF@%A@b9RxLH_#`G7h*Q^U^tH!FITXWyFWCSzoRxS1Gmh=}-T|4zc_qwef zyfpCr%ERdGCkLN7S1eOUT{P2@5s`brOQk1 zhO*Ppulu?@9`X^FG|KXXf3nM@R}i+549L-@N+T)!TXD{ilNa zx`JI#2W`&;EzjJw6+y$M>gB10M^UYc@7Q0rf3Nk9@w+|Y=Kf%l5ERY@GY039-fcXz za_+YCoyLxkrD3imWNnz&FJ>)XzMEBWqvv|hVnsNsYOZ~~uz0R#J+EkCbU}RCJg{u1OxRYrlnGJKkF)Z@$;>bRrJ2hr zUFRzoM;FCK^L*-h&bAxw>+Z$Wa85mSapr<{Vb?6%e#X)`$Cp| zb0_Gv@vpKL-CrtME8QC^-5cchEsq3qnnIQ)>8*Kj|Dt_qe=w&pWNDP6LYDGRop`^5 z+taDp;JNgixm1ZKx1W|=3MutVlE%%FAzJN>vilDjV$j9vc>{x0*$hY}U&=lV{FHl; z#Dcv~R&qsumHZBFioVS*Xp|Etj9m>VcS@BLMJb0UeTMHUiDP~hoD#WJD)K_Ml0Z;3 zNrBTvW3>u4wJDY)W3*StU1?ln;`FMPByHA|&yf zcv=iUlnhX=05Jf=z~}b`?B-VQG%TRTe842gxTThFP^?>Ud&xC5Q_H)qxcz2m8nS6= zg^|as=o%)2py4q$m@rJgpB~A2A(A!^sC0A zcZ(YT?&QkAcZOacf@&Z|ZTlGjMB4sU;{ia5xl1ot@iT8)sJPW|vmuyY6HME&YOG~I zWap}JH@j(CH6Dy64yGM^+jxjA^4|OKE4HR=j#-5+kwRvXofPk>)|%GwkDv734WADUy4%GUn&mVtJDpyhaXJKuSN z@9i7l+Xp*^0Ra>y+md{I^`;z@JGxu(1MPzY{OSJAo|gXed{_JV+WQhKv@)f<|1D&N zBB^$O$v)88(=H6O^qjsw6*Qh6n#S+;!&H#IvZ)Go^(j!((hY?wxv^^VTH4z9*1m2> zPcH=GN)dtDhoy~eR^WfCAEEAfK&L9(d4f)bZLzcA zcj$ee|0|gCqZ|y8aIq-&B9h(*qhe;(?VM^CMOe)x0g;3yCSz1U zD#s8smD{ixL3;;fMY3cBZ4;_kK&VC)0-zd_VcY$5yG&zlaK=2#2b|HAx{;+bWe|bO zR1QzBS~@n2u`!*imQ$c-O;%7K z=>d<>p41yitvp{jT{$*f*-_b3DKHj&;ACg7(B3~lK(>2|aWeD%>|9oW)j+gOCtBy zC|kC?n{1{uotV77f53ZeCuun^PJL?X(;hIbGP7^gUawtvE}Yo_6W8p*IrG+=O{NV= z(lR%+rqn&e989~Oww`ryJ-g(~g|8GY`M&l``TcJ1#pOmGT)`tnCf#b({tz+#X#HLi+&RamdIvY|CYPj_4vG zgL7~>+EJKzf`ZLpkSz$2=B@aDQo!ac&2x!e|8v}Jf_#vFdB#*WNTxhzW8$vRgguXJ z4}gENQTMu~_1)C$7l%GI^yx>}QY%BLmEqK?rL3=)yjl{pG+{czmOBAll9WSW$S~z% z{P|4^=EN>z^N2|Phwu6c=F#u%B>{6l-Sbfk`5Mf;U{UV-6)i+10>4prLJu*HbfI9KJTZ zF!r^g?Qvr#yhs0@nr_ zPZDMsj0@tgs9AvxZm>Gap(+CdhX>79wIyJ$^%QVq6bW^t_)CynDNq_%Xclu9a+MU0 zWFc8F3MRqaq}P&)PC~UCWgqpJ1TFoG;zUs4FmLhA6z22`Q<5>SP`CK&oT}8slwgYD z-=j6E+6vS|p!f|1loBP?;G6xtav%LwYPP9gkOwBV=*{@cVd<}kj|0@eDxeF7B8Mc= zbl;sV+Gsh5bWS^F(RP*vLBH~8B?7-tJ4;1t(f@RLQc;^7(tT{`NR58E@5L>?n8v>N z=*Re?_2YaIt|!t|_ZMh~_x}*L=+d5YE7W4*9bdDp*N zxr_e7%sjbYnex$S6`Sba7ZWMpDmC+@JIt7?Eg0nt&>>CtrxdaP>Bxv7?z(VTiHNGE zIjoM1b0$Z6|54>#^jE3i9@YFBJ)Z=3r=XEMNjPFHb=Y8E$nXl@_MkB=OteYO0p6 zcYD0GEV3S28cc`B#|Wg}Ssovdl$bh6Z;5o6Vzo!0>(R{5(r!x1H>aUWLk6&t@=-G% z(bT+Z{78pye59vuAVm4l6uE@R$9;Hw+;`<8Lp^z$qjx?y)DS;1%=*Xc4n&gN6CUW| zkYolt9ym?%Ou)AXaZ!BYZ}36LxBdry#Q#ab|BWEfG&BMSCQo|ZV^2d%$j%`<~8MMI4Tn1(PT&U(B&PHXA`?t{4Ev3X;4fX3Q-t*N~)*9Z1VY) zC&g*X(PpRbs%I3B_M=19N9${j4&eqKRgOp$A&S+YL{W zza-!6__+H@MDK<1Gbz_N$Dkhs(|joJUv^GSM$B%X8-vCi>ZP2JmC@Q<P4P#_%?RWB$5*zgW_tjP0N}C`PPbsbAnUwEN>sih7JY&QFQD?i zF9F6q&;6S*lcc-pyO!4pMGvmf7fbz@u_Q1z4*+vXGkFjS|s^P zDZ$J=tCqcBMQ*8dbT>U?&Tu!s=+?HI+ZJ1ImCSXnTl3bee8|eLScGev_>kglt3>Hm&kgUu1Xh3S`7cXYqG=ISWCQoV2`)4%VHv0ygoW*g5 zU$BsZfMvn4a57ll5==Y(wz2hBVCG;H-^oAyn~gk8e(8@3`Ty~YEUxCf=GPmU8rGsO zn11s?4n`N8y|J};NU3TsFwwECM zFNT(#i2BF9xm`QW|CF9hVWk~0cdW%-d$o7!TK3}Mhq>mi?WP}=8tLAy?W!~UaEF=h z_v*V28h+Shp!*}{?o`8%EE}}TV#X%mRIo>L7>ETpppf5;h03a>BTb;Qx{%y9AWu;7lTOmrces59D5h^3x?!qF4F}_+iwM5jMyP ztzeSf7=dnMPW!&1jfe0u5Eq(&o3Y3ZQHCa^| z&{kFjGF0dyxtm1vu>_jmzmvLmApd?vZ^;+s?m)fyLVx=SIKYH<7B38(hZ+L&m3gWK z$^caYf132TPWShnf}c&{qPbUepaH-r>+1ta{eWW7^-6_)mjd$fm<(;LF;}zsBZHaL zj_sBh$bSWy6CoUu9&r@3cT&1j2<&MR6u(6gpP-d_l3RQIAC6ZW@A(2=7ks=vNCw~1F z!Du^z=ebyA+;d;Q`0B;44u@@r=ZrtK=C0eaw}it4DvNJCb^WRD9|)HZ{NTWCXK?$$ zm6nypm632o%l8k2$_K)^jycP^F>4`dG3Uz#uM~vNoDbJM^46L2cUpowTW*)$w%tA+ zu0M(BP~9Wp(hH%y3nAl$Im_KlI0hLG_CFSU>}oJQ0B|oW#m~ONUpoR+3iScam>k z4W2$1EE)`120zvFell0F1A>QiID0Al@E4U_g+udUr0x;sMfZ)W^tJ-yo7VI;-m*!4 zfc;I{!B1cx#}1C3m9D7V4(gBC-K8tB!_DaKtWv|Y(^J`Tm$IucWiITb@t2?ljckdJ zjWZV1K1iZKW{4_Rq7dAYXp3T~R@5%RUaHg%QUkqUNW^_}=l~t`Ds%w+^R@{|#3O;t zaQiQav(kUYWNTN;T5>-e~JWmGyS;&B+8G}+AaPr1wSflAL-MUxM z{|}iFF{760NXA&g)oFC}CNL+YF$eUr1@@7Tw`RTt&PJ~WB+DDguQHkB$O;^gy+TEW z{C_GcD)qC(Ujk#|rT_TfS6<|KVbtmI+{c`NOjXY612e%1-`6ez5~n_vzcNLih9q99 z2~t1Ymi6^i+huoUduNHIDcc<}fXgTyoE&z?m+*n7m z!gLG>K{MYftKc-l$)3`;YWeC-yKJo9tj5&Aq)mi%S~Kr}G>6}{YwzxzJK=+J+wqzA0QU;GIBP9}VNgNjY#20ST_1@80vL!PUKSf}(Ur?qn z@0g*ZGsc?D_6&)4(oU5>LBY2vAWS5NC!r!P45NYJ68X?!4Ud=1zJZLej{>eS!i_g!5|V4eJFw2(Q!wlJFG(@u{S+>H8>*d%7!RiJT+UrK ztz-t%4!vzW{NXQJHQe?V4RhiCW_HGjV&f)iSC3K09>?8VLhxwPL%n^#>8#}K1HW2$ z{m?}@>5e3rIP13Z`TKr+>Ie+LKiU_;<= zBL5+7a@9fpL)hsQn_P$hNJwTSY(3e{fn??z#O(_t(M=Wmg+mZuz;OAJ7ms)kC!%;Z zlA{sw6%w#wMl(;@u{jPDTs{0~Z2CYE~MDQ)^eiR3%vk6mrWTJ|4NSIOPTeN7Nsu#xQA zT(e@wo6oGAT&jDgWdEDb$nHOBW-CfrL<+*4iPT`loe5EV*di5onUTy(PPJ3yG?8?Q zi%lj~DyqJ9OriiY$qmZp{*2^r+12G#`_rfsEIVVJq3HGG4CNPTT)smym-@3Jew9D} z{gN<$K3sGjzQdSLNXHw`go_>F0tftHVyjFqTeCI?YnoTLHwTT)k}ZpjzM_1|s}ptM zy)xiWv*Q5Bg4%NC;aO{$O&AeDfM10S13{bt%@yeUREkO%lkU}$?O&~|vX zW(*}=4``zl+9b4sq4{eZ7u`I_>jC0*LK4Me<46uPsT+sPTp`~>ICTivi*#@nxf786 z4vo0rG08iLeSmx>C7$Ah-fl7%2X8>}63nU!dP-1?!=V>O6`jzKzhDTR&LDH|1D9z< zbneI%Ot3r;(ljE(NU{Sjc;v0R!;4jSdO|JdZ?)WP!PAF6BVK>fl&_h`AH!`EH>Wnf zIT%HyIa@kCW-Wl0v)x$aY?p@Pj-j&mNDwJID{lTq-qP-cqIYcDzmX@eSn3v$ z>?z-)8H*!Ii1hF)Ny;er0nlQ6ds;`|tHTIi|r>Xpw4uwd_}b4r-* zrFaZwN?f78#$Ajero56Nl|o83Cn6d5jlw?a@)=~@ymEmZj=VOIx1F$b;2skf!>yicya!CMGkIM!1#y|GV z70HEq0uZVA){+SV#k+EX1=x+*AoijZ<^dF1Y&2oxW+sznD#z_GIFu$eM=3XcMkb*Q z#$=Qyq&2P{Q3FS*zm#%L(9}GQ%$uj?I@$C8ETO~X&9kt0Exj_7Ub#5_PI{d*6X7tq zIA|=Alr59x0ojB`5*%dEWga)olP^1=mrfrL3<3@l#AgG27H94wR^8{)-?isW4e>a|pu-nh|{)QTAGP3?=)Nj|P*?XBkssBOy&%r#Q@V zDVWmzug=h<_M!wJO4f`yJ$BWsHqCnW;q~kz>p2J4a~s!lj=rCooM)c1+{@&0i{XI4 zn*Fx5ay>nZRM^*xE7l9ELxnrvH|UBof63`GY?#}*#qg9ZQBmtTU`S6(lZvNpl{B^8`QU$n=tU?40`kXq?q@~cn@99P5F)NJ*Y(FHvw}o zPnhvNNgERFpRlmQm*^YxlsaLyrv)0@aH^3wO?)ARF(wN_qR0P9d?R35IGu-p@`TjD z-ed3KTZ*IpoMUqIwqvsmh;Da^aG~7e6&VNdI6Hq*kZdU7z~+TNh0lQI#+`Nh5b!NM zb~*E|V~qKC&MQvhUh;w0`Ib))uG^aKTGQwEeEQ0I zZu8yTDhUdiu`Ri#mRPqFv_qt>5J>>Ssp%NTXB)5&)S> zLMaGj?uw*?#S$m;A(RdTUbQ(l09jV1f?ak?^E@AArrAWEYQ){ zqb>>i5)z+enljK9*wmvvP-bC7sadUG9#WYl?c;>4QYB{&Egg8|+num^AY+6C;u z(&b=U!H#6?%bDa}v0f`=*XyArjMAj5UzNBiG1$8DqxZ@kP7{}}3x&#^kR#+a={)H| zo{+y&G94Lv<<-k`NLLwsuiU9{(iWA4 zI(MN*xkQc`9X3b0GK+-5s2fJ56hc3lv&Hm8y6Z|R9xQ_j&S6XB)=(&776BOLm_EP^ zjK9zawtmzz{!+ftPyD^OODI-&p~^2DGG-;5GbKAYrJV`in+0BZ|A+PioIob*39?}M zL(7&WjYd^t26wP6O;#x>D;{2?WdM|Hr<0$HsP`tY&~JE-O|Fc-+EJYQEiT~|5A zS9Ua4_B2-t&JsYe^EkDU$YO~*0jn&i41q|*Nx04_&?FNe4G6)IkB=a#W7B!>^*j~8 zzU7>9L#7LVfDkz1JYD&Gis~5umN=>s?k%PIfZDJ#Ep+MZ35sNA*Nff}I#kJ5Z#JLr zb3|QXoa#H?%%7A`f*M(m`VWs}R@1 z8+Wt0j@6wSPhL+X#`+jPa+PnHa*x1I6W==JokiJ77vVq=Ql8~cvs2cUE+IxdofU?B zn|lW3pCkT1tD$rR6nU9w=F2*H=d|46Wk?0Fs&oQoLyI^z3)OiP?eROWNMz%fOFG1z zMgjRrtSKLV#!d=_nC*mgpt-#sWwkTEQ*j4;NQ@8X)Me*YABik+1UboA(dR@~QeRTm z)>#oQA^f}`ofJYyD72|Eqza0BtK^UhQJ*rK%Qp|QAbBzbLb_vzy_q*NMSLQ4<&#UQ zr3joROkaS?Iw|VI^%mxd`ZQ_hpTcm_nbqkb z=SSTfwhLGxq~NR+=FS0H1d(L8lN=uPUy)9V2g-;e;z2RM^Sw{>pzKJpm+5}{-g^X} zgDa^T`~)nl!#w`FfvYpFKst4X%oJ#N+~a|M!_fp{}-FN|XF;{QPzCMm;@>FyZ{h@O_z*i~Eei21B@YStw? zroaJX?cD;9e!qVPBBd!eMkSKgF)(l%mG;1L1e)xTl)monmY$a34%Ab9INjGjfRn_i z0fw>J=^6OJl$Tf}1%s*7HIfdQF4!g0&@`87A4!sqoD)w`<$!K8ssSgSIdQmHI`v`_ zb_YyFaT_(}3rMB7>6})&L+WjdJSW&ed=U#HLS))7Vk93S@Zxh-yh@pPDmPU=VOp9V zA}qx~)J*#(*bIqi{l18H%pcK@c*m}?F~AA2FfV{zeZ)lV9Hti$qtXsBm-1^|TYKn~ zC?s(weOqDrM#b=dLIhDZ$O@cCrti?0iry_Kzjf#fhh9FiREU z)~XMOst>P>tc-@LkB18%d#n2R;;w&kz2UlZ;=9wqbHkw%!?(`h)m%@o@UfrT`1SNW za`A~XXD)^KldI`eN_o{<+5S-3{uS+tE>w0Xob$+AWrr5d{FC7g!|n0!rUs$ockKL) zC-7?SWH9HEd!V*T=5_1aif&citbz~6{_wV)u;S0oyK(ONxrGbiZ2P?Sr?zYyyqyi7 zag}RDjc*k-E*}pU?O)KYS5z*Je0lPf$>pjrL{F6mS9Y&Vteg*)om@ypRWkBf?QBbD zUoE(u8{#|PNk8?z4d44^9-Id(Oe~JvoV?||>0NS%iw~@v3>UX54a!+L8RFYk)7!yQ z-^MSRL-{+_3oF*Q@#{P4SN4Poj=gU(WTnlueUQdw<=r@V{otCdE@Z3w+T^zm22b|~ zZFNDL04aQKWw5F#nA1FW@?Da|f5E)CC!AM3*LfH2M!T3@F8jl|hcMz&5Qun2XwXy@DvI8r*;j)%+?s3TX@%qa3D~qP3-QnE&wcOpI z+}+DfE7RfJZWt$Q^@S8}Tj^rnmrGtL3G%ywxx0x!mAEn={sIj0BB$oVUzoYv3z}bl zki`{lho*gYQ6#Srn)ca6zxm(@m!3~pJ++%s^#9u%USaLyZh?8 z3bfxVZ!N^bosyOc{QRIi4dD+n%oG-wyX=M^R_f@!T0{4Cc3*4muG9T+Uut)??ngQe z?ti4$ckkqWRBfR9Ivw5bEbYnAZE^-cG#Zf~_vobda*k(A0T^CB@T}oYikHaOHx5RE z%05V}I11JCWPB&?in0(?lwMD)0!E1NAY<@ERZOH{N?ddWiKkP6Re};IvBLnVoCZXb zK4DH(onOsEtx_;FJm#qrg&v_yp`D`Z2KDdr5JEEXBybvN(r$@kI*eDPVAtK#?L%d6{CF z?h{jDqFtU9$dq0f%URyn$r>t>D+}}DpRg0iKmzwA*qfF#+tDLa^31?MQXTx>A#LI5YopXvVfhJni{x~cyS?Ncn~fB1;j z`Gn^9PR*a>A1~nkqNVhB7Wc+cJ?`Ns3DIw6X^-dW-pnyjOhM-HI^COfI>a-7H{xT| zb~A{83AjxFtRAK5rd$@>Fm~A-!Yx&%QG#>{F-{B|s&od-63o;@43*e4jj9SUl(3^E z8e;SfR3%iDm=vp$iqobIF(*Fjz!Z~LBU}t?%1B_qW9Lw%68|7&di3-5*f|x`vsFsVVCJpq0VP1= zAXIzOVEmqePsLrzPAN$>XVF@Z6(;j(g56<#NDXS(sTw;P4NQY2O_geCKogNUgFuo!!7;%aYtu&8$GoLOeBaQw|acQED`A- zoq6%^kmsYcxFoTqJ znFIz!uuR8LsyK|-OglHAt>b&&`*VsT2YBrC3p3?8(bs%Cj^ajf-_6L zi;@d4A|7*!#5Q@-H4SPa(9l@VpM-Bg*ZAy|lyE02xZewdW@>!{FR)WMc~B-#x_p6Z z0Vj+=LtA2mfTRMo1H$^kuDZrr@Icrpjr{J5Afvu&oyXdzuqG=51ch>K03=A?q zp#3caoqfIVRw{Hsc^fq+VHjhRK;*4&8i+@i`!1ec$mgp;ShG?PRVCn<)GxAR2FfH@N_roa7+qf_!uDi=X# zwv)~ab>8O8&_ln(le=t}C>4PrAnwQH;lLF#0gw(eMI(tX!IY?&fE=nE9;n3A(XLW9 z42aC=kU1iGR2JI0~zu_Q3CXY-(?Oo!Wiv$ zyv%|6a=4}?WhMCNI@#HMyl(){y#xIq&EY1tZu5dQn*+M?a-6^+MYo&9|A_kFK-TZj z5(2U&AD3~P?%qOxgT?7^d5Kgt%;GQ#k)7rWRda$g63i!oJOljVU*axeB!zNDr+=SH zfOWiUMxwK=^4!Epw@fb_jD<^H9FHC`Ob$QkoDv(UpqHr(1N8k7ag4rk1Q9;k7->dr zx+FnkHqOr(g-lWv^0QqmQYa%}3E$o&EGz|qq#~YfUp4L^8)MTK%NHhIu3acz3@lfC z)Bc(r#NzI@;I8&y+KE-;NlKFTMc?9{m#;2NF2THg@2YX%U0e2?@n=BEvKF6M%e7xS zIoASJ@%d8=Pb`csX8cv-qW?=xi#wM3m-WkMU$ZPd8_YbiYB|cZ$w3NjCt4?KzT?`n zi$!5;{oS(dU$(zu51o57Ts!mDxkrQjLU6(x8u5nfdX_|(cH5SZFJF1>soQ&Qw+8oj z1ow5`vEdcchiYfSWlzjoLC9w01ao$;rZ>Kuo*m2~YMtF2T}?lx#@w~#uGz}*f4!*c z`iY+vl?BUt!bQFFC+_AIL4DkozaV_>@KWU~mFw9B3%<`?TspDXzM7OVI$kJ&3HX&qnIl^u%cf%`?_WM*7gZ#;MXIdt*9aBlNl$9h%6{J=u>^+!XNil3q9+7_}FF5N6%%dZRN*M;*Nf_XcG zJC6r5TURY@)Uyq1ma_LRYPh_bjS&r((W;pfyBbjeu->pM8 z!8QqQt68!yw=M5m$qHs430aPOs{MT(S5!wmpFL;!@E0hn?gS6p%0zB?OiPzoY^OwI5C3RKH}NqOg{wgQ94+Vw+BM%aMpM8Xwo z)lSQhBvx1)q8H~y0oD`bbxn~e+$AhQ_;jqFg_;wLtAIE<;3Nj;vSrF?s@bv_vy+`* zqxp9}Z{#qu+JYv|*JU6*$P+RIANG730Cwv@ds}HBy@x3b`((XS8Fd!_3hRon1g8gL z1Sk9zNvN~9imJ|nYKX1x2{M6~T2(FC1*rBZRCST$?z6m<=g*miq}0yDhHtQkXZXNZ zPy+_}MY2s4AE5vnu8bovY~eL}{0a&dgA|;^qoN!ApXk*$DIlysBrlz!iYde>Ld)tx zX~QB79i#!wLPT{=yE(dwcC!i;^*r}WoP?f>lho(h2_|$eW|O`7mx>mx!G@M#=J8ca z>$)xX#?k9X7x#v3`-7JKitojtwbYtWY7NdMTFa>k<d{ z^$UHhT;droP2C;{+s+0pXHk>2)Y4EY4z*fKZ3v|{Flf9ZWZSXS9kw;W6)8(m5lXFC zL~VD6Qg^djP^n6z_MzIg13}9HK+Is~-c`#!PJd*GG~ZzYL0 zVpTH>+D%*{zaxXWgP&BkezHVwfX~dnF{DkPb(5fO*!+>UAZ}ps7)`5|+JuzYRjA0KqoxF%Le%*ck?gSvC~`D<_5H={uO8@mC_- z^}%dqzL9>ITG!)XBtb7Mk^WHZA+8Ew2K&u&Hr0od$$jvA(}6Y;b7tnBp`*ooxd4nq zZw#QHeBWRmQ5}6cPpU@+YjvosCvEUixjku8d(e(Vs|4&JrPi#iA9s}?-yeUKOjwyN z4QtIf#~AVHfqa1pbP0gMP@9g~!g=Q4%R_Qa*0`cb!7Kt8@sK=TV#k+JEb-(Apyy4( zat5uK1@=c)vl)(ET3QD>&$c)7M|klvWjjN`w-7{3POx$q>!jCi6Kg3mlW~)HTl{wl z2px^_0Zby-cxaXYhKUi%cAbLfD7cNlZk4d|&2SAzZL#w*6!CH@I6&k)O%`bzVS))I zd9N4+eUT)*ot^SW3`}Anev&>wl(IOFAfg+A&pKK!nnw9425&f&!zk{C1*Xea!E(d;eON^ zJnLNT9$DQr`d0VI8*R5+md`9d@tUyQzLH5Ua{jh1)IIXnuF-JySSWLBu9ba-r>|sx zsx#X8I~JcIYj7Hjn9iTPJ-s^e_}Yl)tr1UnWM*w75E==DN1hH&`QA-n0N&M zgB^1Juyk2Sm;CVZoL3l(1cZpcl&{8l!BO!C9BU8XII*PV7O*467tZelJCN^>4?7%5 zjFbev5q}M4I&^z!cH)9(xP(|vluTwRNHP(3CBk6_a}?MK{3=<6_(w2879lkeE)(^d z9rZ0t0=KiK`yl=`hfzo)SPqD9B_UKC#D~7Tf-^9PZytP{L40aqB1r%LxzUngJ=y_r zQF@@U#|hOyf;tkT+sBT{B=U_wKjUh#<0pCF>;yF67*{?H0}#Ij7!}B2t_)-|Xg5FN zf(0~+2&7Cy?j)P+1k#i{e9<*JIopnohQ#Pk`I;I*PQ$m-o0=MP%jl?AWIk^|^O9Pa*O|fk{E&huP-jcWsqbdBz=XKZF=EWg%|%bAr0TAU^Dt* zPyaY52UfSNe#eOF*QuPpqaYEEk&+0<5Whpo|BM1+j>lmP;`b2eBL+qc#vpOV^)yD2 z6qylD57P>M@Vj9L0NSS1KL9^KeoMvT(`!{tp{k~E)xmJe!Ifu%r-#-~kAzN-ginu! zkB$W|UD@EcXEdGKU(rvW_I>FGnBp^U> zHSByUWPf_q^31xeV6KnR=G#NI?Tb&}t_jwsZ-+1r~EZhA{AlVhCB+ z`sO-faf2(X#+~=$2A5VZy4Ehb-@51yUz}XKI2*b+8@_lsREgAA?^<)`tRLP>B?N)# zsr+MMPDhqSY{7Kd#Mw3?g5d>+FqdarmpkB7*wCY1C1Ei zHz4|>{Zu?b!M7=*jh!;QmEAym%8j{r=bJ20*tcCsVVtE%CjF*RNl{>%9Yhu11WtR!J_oQG@4%RN zcznq)5iq7+-+pcT>>;1AQ1SY?YcY2*3ACTz3uN#8KgSQg5y*lDy!w)XNj~ij5iR;5 zU|CH}Q*}HAmJ?zCT-6(u{7!I99`dGs^QeLiRf7&QO1v3%!Eg?2(}ALl4=}S5erDXz ze+FkvZIu=jcuvh*l^Q>wFPd4be^`m`#xNBf*o;l*TDKo+H?;6%nT7CTg18dwc3hft zHHT4{+CO0Fxq4-L;ScclCe^v-a*SXh7qw#35g^S6_d$#ub6dC9KR%vxt0u1 zsUk)9a!X%K30RA#bxVc`mKA^vW2*u_t}WR2eYkO2lM$ z$p@x#SkW#nhY(Oi#3aUo5`F*BQ|>;Yq7NcG%F74K0Rt*sY;h=GPk_IYmP;_3YrLzZ z7ix=S^p*jrM28@p8hrJOGO_}Le||hBIh|w^|7-ko(#yhp=v0PGI|#uGgM2VMe1QJi zop)&JO7SdC)47HI+)8h5r7yR7S{F5-aCria9nnzubsH}V8t?w6F}dc*I7QWvs+t?+eJ!eNdZcXkm()_#u~Y=` z<*^)SXCtZdrL$+)GaGq`Vku|fK4muT84PjDz_Y3XL*UV8(Bwqg1TlQ&2x5;C;)k&M zK{l@0Y+MNyix!Qd6@!{^ zeuS33VsQd`{NMz~%vyfr%@2d_b%1YK#mH9RN?_E^(TR!kJuX8cQ{aYzttP00b1Eo> zJ~=fq$y{B*`uL3(tow1c7K(&_gSF(+SD%LXwDbW=ty_pdyY&^842QE3G1wH91=p*gvCfQ~>YZ=^dfwKzR z=WQ4SYKJR7u}|`w#>A{K2vt_nrM9vL!0@-UaW^G0WdTXZOPZz~mxqghEZoL43AGel z!f;AbPdL;;gNg0+PO=UzyiK==`XQ81$p#7*5d>5B4PQPr0Yv~ZWhMwy=O<2witbqQ1L*}2;`OGN_|nTnTO*{0?pktYtCuVl0njKdrCv+vlBGOg z$zcUIc+)p{(m}d>FTFC5Tk6lP@a9(dayQL%1hR5x&%FFhpnTJ;@osj3Sh!^=d+VKS zYal1zZS+AYw!9{Mqh%?(HGCuf)k0RbW+}UtCF+;58(5-kDSJCh>{-h0Vu_BW>`s=T zij{;^(d&h`3OxpYPOUemHejvrTWh@5n)xEpS|eIpX7w;cJ3IAilZSt`Es$^Z=Qn!u z8|TXw__sE^pWpFGevw$*zLdWsC^Z&sf-!7=cC9zNb}4%^azCN&v@T`0A$<=Db&aTS6 z^1Y_~4@`ydHlDEVD`yTS`D(jmmz8@vYi9zY|7q*iOM7fd#{AlZ!r2Nj&T{@!(bTp9! z0bis%CNRNR81&%~7)Rq5bc0rfaFp`?E2Z9}-~$SR6nsR%|D=G(eUd%e?UZVv;D>TR zD7y0_;4$Cf?&#KsbP4+XyxSS#DE3UQusD?2sL%AIhd4xY=_?d1U*J>pt!{0ILo`41 z8AU6_yuN%Tu9Meig!V%$J=d*W;SjA<8IWT^;6@#YJg6Vh9I9t`@I)s+R}$h_V!m`m zN}!SgeSv2%#37m+T%l;Ctw!GlpJl1~W)B`fG>77dLgmT&1~;-0d60!DWKGZ)N_{Ex zOwM)OzD(UI3}xo(EuK=3^R_L-Aqk23!wV$~&V>zco%C+oze0JTZ5jFvZgdsVa@Ag5 zU*NWfIEv>sOYwqEip4{Zibsx#Cx*q-@HICfJ~<_xp7Nf!ARf8kJ#_Ii_DrZ>ldR8~ zv(0xbSQaMd4|wbF7UV2jTQMdW3`BE>=S$u^%90C1i(ABlcCoKtJlOwNZQfnSShjff zoD^TU{24tFdWMhF7tVErI7IUuD-?yy+4@!wnuut=e_`;g$5>MAKQ6_OV-#^~d5cZ| zEbq~VK;L_opWm>s`K?MREuL_M6HhqB$&2FUr$KIeMAJ!Bw<*4pXmIEFotn?sgUkJV zkv^Ly717+b`HnYRSrRRm;=$9O(XHiOe7t^ur`e$xbJ01UK0i30{brB1y5lpJ8A^)- zV4x~Q^HQ{6p+04V%mgyXB9K8A1DOnIW)ZEbji#(HRHf6eb8lYZ5Pfc{)30Nv0S5H0 z@b+(QKiK8cZmz1u*6=H=l3;vTh>9}ngfFoyK)_Fea?Qj=$AmC)1}-w>pJT(5XD5b~ zpJD41gLp)_@L^%s{XK|Y;ZAS@Z-@I4J1=OBhcO+4VmHaxY>5=4pX(yA!e@Bst^R?P$#`dukhXXEK zCSdb}bhsQ2>3O!lF!5ovEn}H#r+{AT#Hok-E*D6(ykH8fgL{POOS9T*Px}ldfztKU zi8FgBpCWC2q356jtiQM}X>OI@$skc41;~&^v;Zth5 zTAxJOShL-1g0*bobVA%2Jrv(9;T}lze7O%$5f1l( zV32LnJ;G;lQfdU-ufL!?3g`)@I>Q?JLJABrem_wvNVnBnYhAOo1cYpq{~-wF-<~U>|)LjVkh3N$%UmIyO!3$iMTDWF$`xCIw9U3 zCm7De1NM_7{IZ*g=nEscs7syZ0m>tQs)opX;n$J%QK1n(Zu7we*)OeA_%(8(s1X8v z&Sk@y;n!3-;1FqK(&5OXfv4~D6)sRyH{JP1JyI75y zeKlir2C{Huqmt}`04{IWG?H^MIV&pe;|Zi5bdO#MVxqR^Ft#+-Dnf*i{zsY)`rsqo zjELQg(R5;bLwQ(&xiRBhMeIZQULg%a4eG!kq-4H6erw#9vgvL`&3xA4khj7y60kAwOf&a9{_a1HpoFtxu8$)nU20^ch|Q33nn{|p zRoB+=?{Ppv9}giX?PhJMB=oE6VQB2YO$-jx_$B&h3$?pV`6*>Vp*l8;WivKmn>4-j z0c@$3a$+84C1IlD9Qf&@t;Hjt@D*Fn!e>xxF%G;J=yw%@VwL*Y3H9swR3jN}f<3WQ z1B2(zKz3<-s1jmAq|jSZTU*X1|)D=L{=dr(s%c^l}r<%;OWz^HBC~CQn;X zmPg*{(cw!dhOknaKYYcSsE( z0`iTs!%#=Z@7kK03ul|503_g3-(;;JYi88&)z`?_8T8}g$k60jJhZjuJX#_>R0%_Q zlM`de6^6$i^!NZCRyw=6aj>>dtz=8J%22l8oV|KlTWDOrKH@FZN~bWmYRWJ)ldZ+l z{Hx91+1e=JupSsELoIk~t;!3|w{?Wg(?OpKl!F$KUM9xRw7FiSwzFhQ^?B6?ABAi1 zNo&i13{+Tg#ci}6TdD`zNKY(sYiMH7iF*{$&9BwiQXSiWG&W{20EJ6FH8nIeFyfFj zy99x4EazxtYamSyb@pMwAr)aVUorclPMDaO6iirc6|~v3VP%q}_b1uqVM*^UUv}Gc zQXsX^pIYTjt@5STOvi;Z$(D*xKDV*flMPkfx`a?cn%NZM(h^KxGFIf(O;x2wWtwg?@hg%x#?j;v+uI2!aL&^Z)K}!0Q+cHNtE@ec+^hmJpR-e0Ylx z(lKf~Xw*uP%|d(7hJV%f#217_eFI=hNK1G@%n32D_>xtoFj@o)56ZdzaL-)(p=)Kr>$eVx2 zm(nX5dcPDb^r}eLH`iU-T=)M^*Olh1-Sy3NLJs<|7hM;!XpC5|ERhLqth-d(_%GkD zbY)5-4N~oyQJK`nIw21|B3(Y`+Oul+=G%QKheg9-X;y=}(<9J(-2qUDL0D-~Yw>sg zB@BfvR5*0Q{27xi zsYjAeq;rKUYZu^=Imj9%ldo`BGBP#-9B70CaJ#hKJ8WO+s$B3xIgD=%`(_>7r`5T# zy2fEQ1OfoZx(-?iq+vRS408gbFT`vMzeU}`?b z5-fznwj?1RRls}+{2#!9$?)iCFsZw}-?6J}|4xU!>)1}=IXnt99ju z(+yJ?v2##$!KC*6`wtx1+37fZ^x#fA`*aD(^nPMIz?(w&U9gWFjFb8h)PhySx=IM2 zBu$XQdJ7hlW9mG~5C}Fjv6t5W-=i9Ep&@*ha4muO{xeU`7A+a_1K>YjzIOTgGuNK+ zrL3FQk*tqWV#$zm&zSPZl|JLTyJ`92y853zy)fi$*t^*2Z8#{_9}*8877O}(X{57E zBHZTdt=C%p#v-q=$X)I?+Pp>^d5%1K9Y-CTuE&*f{NzBTtG)6V+;KRnDjyRDP@5}JnujV_Ucy=SQDl(= zF`VNBY8h1gU;}7827^m=4v@iY*Dym{2yg`2n4X|XEXMO!xVz@Gn{hA2iCH$!<~f7U z+&HcOo9u$=DbY~vZb86$9yDMzIEY^tGyzUvduo#XiaGNeP#@dQzXwQ*IS1m6t1p<4 z$8gA-xCY+4)@Iv7KxqtWI@{)3wsSrT{AP>48B&&*qWU%VB?S>5KR#pQO7tz^Gld*98Hoe*B#$; z%=WvTOGz6&7X!IP65H_uupQ$9g%v+_+;(^iTV{-(fCbs?&#Ut0Rl#GeFRRv{)#A-+ z`MG@|-{0QnZSV87AMv&J`?HSvvX0)(Efv?d&Y%8|6K_q3tvzDd0blMx(Q9d09qTM|1o@xn7SF2s@( zXt6MgRLoi;4%zegyH6CM@OcL*=~DY>N1)_ED+!-ns&&YO25I#Rjp9M0CCGn~;hg{^-lSJx6 z4t(OyLq%&+AZL)5XqI=ORO}ISw=cT|N6>VuKfTPGUgkdYvPGZTcSc%!52^Ve1UP8Pm(3^~QdiV9Q>@HI5FsMBbr+GEX7^yt{3yw2zJ%5eAKf|95em&mys?LIoFoD>TfmplX^HDZ{3mF-kaJa%*da_zGCHsw;0pw6dhd2URUz^rCXOg3BGl8GbYH5yuRVq26rPYp670Zm33GpH{3F~ zvwYb$n0G5EdcFTvzk779!&k6vCh-$%mEYRzwKl`rowai&>0UvFzo6b*P(Syr#l0UC z9GTe}$Ssk?8|N-AZuM3j^5ypWExqqqdSOVdu%8EVm}T{6+Ps-IPyO6hPqi5h}Q2{F_NXx%}^?J$FcASMP~G>6+50g~M)@)7|_Ii6G9dCfVqt^T}5 zZ(gGaXU&GDFp;mcoOdO$eKjn8UIBvd;2qy+ER>z?_xso@prh@tQ9}moe3d5rVb{=yoq&h2nDNbyKoAqJA;vRu z_hERXeTeNo^uVe^WJa{~d1PF>@!)GTuJw3CbzIZdNvAW9{ii2ypY)}*OveZEivPUo z%_?t7yJ%?tk|Bpzx3}}WD?MhEC5@|?A*NNr*4Z=?|C~dN&zTB7C4WMJ!Q%)hgXe8+ z52LvUKof%g z>f7LnZ#q~}`@cHMCU1h{khbYlV2vdG*qV+07Lbe_wnc_V(E1I=ks`v#Q0y34qO=nt z19SxC{~Zkw`G-?oz{C}*@ut@JQtPJU;0S#7w2XSQ!Kv89yNqmb!c>hIC{nQPAr$_7 z7c8_Il&?eNgK6a=_^ZSMB3%GAOqOuxMjquAA_B^(V43nz+jY61@ivlqQsvkmk%^CZ zd~5nSD5Cvf4vPY=B56gayk;rt8+ZAzn+n2;*S@}cY)h0P7Xl&F+^nPdyC~V zk(fwDy`_=ga;nb|wXNc;Wmmigf|L@c=0VY}=yxb12)wJmp;!JXSB;pp8FuZ~gpZB# z1O1mHZ2=jlQqGUWG5T@wYtI6?{3aIgVET<#ZX`=EJO8Y~{Aj&vvY#;H<6zTDUiCR5>FS~7t@sgMebQr!$ z?MYFsQtaMIyirU2;z_UfPMAhnbK}%tj5U$B`9?I^io1Qz%nc z8e;-jz^byb>k2uv^+E*ue3blE(9kkEFm`Hapen_?RM6TTq7SH2mg=FIr{CZ+TJp3NZ(SkLlI-5Wu6_1R0nJ#*-{+!;f2i8?#rnHfiykmX6ubsvSWMKl6$lJrEYim`xe_hO5f=I;{z*3EGOGX#6PUd!e{(ME4k!^~b9fPF9R`;pe& z?X%YXtYbd$&kihEIs>T8(&V)?%{MJsb_MdG2eV_LabcslzDLYEAX*OGqb6)$NSJ?0 zEZHY!?HA4aKewT~@26J$cBLZhqpTrdDRAfgseLZ>Cy#pei0ihB>GkiM>VHebiH6@f z#{eo^%;9>fcki*PT(;_v?k!NdKG zt5Fssm{!E18x!r+GO<4euTtJgtPRB>7{1@CR6!X}0aaAh%By@I&-1;sKd!p$`ldZ3 zBwEBV%3fTUK@#7?{MrlOqWwa&18`F)gr6}d6=6%!_h_*{j>1sE(!}dC?t+8* z)U~Ii)}+)1%~#e}A>wy$rm;NGs#v{z}>r`&}mgLj8ey5u31F{Gg)pne1#?23?~ zfjXim(xpaP0P|B(FGg8~Qy2Br8bSeKdmxAL?#NqI%1cI;>{_NW3W?g|Cb>;=`v*YB zifX??07sh?;;$O$00|;i4!g)G6B%O$k1e_`1-WGT8FFcP1R3C`%DItPzE4lt5_+Zx zr=)M}0R|*iXen5ssy7ZxbyZe7X_z-zCHR3}BT4TaLV1^=vQqjrRgIsa*Z<^-RF+KS zMdV$rcJ_o7rgG2Y>ulXtQ&U5>=HubzhyIN`0+KwDuFp>3o2R2PQ;u+R!jBFOp9fWX zczkdKPJhNmU=Wwu)kPkKMn{{h=fN&y!qRZlLn@(IM%F2knS~SGfsqONbi(5=^H*Fs z2D7vykfNKk8QD@`3Xx!FUv>)j0Dls~paV1TLeOmA+570uUPt?*?OprZckJH@R86Xt zsdxpoWN-h6Xem%Ce2_`tF&eMZjlqPV@GO?w-_dfLL_sjVjak>*uC>9*ikRHIWN5i- zFkUx&&oGnvqVbdX#1}VT-*#==^H0w_>5I>P&CUqe1GkQQ_WBCyKQ`vQXUtDTvtZ{KTLw{5P}XKG+3(w*;{CO=8ddN~_MUzkR~+WaF^n3e8;EGDEJ9m4-YFKzvj zdfjC-PVx!36Ri_yg$3h@;SSf&OnxJ1BW_6TAL)ue)TP0Ev8c0vshyI0#TZCVzhb;&Oqnsf z(B?N9OVwOS3Yobi_>(PO)@u_r zSK>Z3X)PMMo6MOu1(P-glePqtTY@R|!Q|TIr1*5rm6Xr3pXB+xWsbtn&uYJA;@2&6 z6n_3lE*$>cNlcwJd4|1-Tds6Or-9w+4SrpwSC<(uz_VrU%y1+KC2;zzWlpb;TS?aH zH_VKzaEMkCd42Zm(G?ESN}NWYJ`2T2L@Nf$M;g)RhIswPWoYJY1P>{*AseWs}4~rRK>+5umUGNQ&eEYsG8pug?k%L7SShKqR)#Fp$o}X0r_cHHxp&%kFDCffdzUfr zp2bR%=h1!JTS2EA>@BLS>sg&}l_PWecfnlT;SX z6_bnb@;m!`Mf(Zg{>P}nJ9yF7OikXwFFn@|-Z={PxWkd^PlUV)9L;49`d?dLV!ncS~34c*1sUls8P|1XLe BAW8rL literal 0 HcmV?d00001 diff --git a/buddai_v3.1.py b/archive/buddai_v3.1.py similarity index 100% rename from buddai_v3.1.py rename to archive/buddai_v3.1.py diff --git a/buddai_v3.2.py b/buddai_v3.2.py new file mode 100644 index 0000000..e3c2e79 --- /dev/null +++ b/buddai_v3.2.py @@ -0,0 +1,1305 @@ +#!/usr/bin/env python3 +""" +BuddAI Executive v3.1 - Modular Builder +BuddAI Executive v3.2 - Hardened Modular Builder +Breaks complex tasks into manageable chunks + +Author: James Gilbert +License: MIT +""" + +import sys +import json +import sqlite3 +from datetime import datetime +from pathlib import Path +import http.client +import re # noqa: F401 +from typing import Optional, List, Dict, Tuple, Union, Generator +import zipfile +import shutil +import queue + +# Server dependencies +try: + from fastapi import FastAPI, UploadFile, File, Header, WebSocket, WebSocketDisconnect + from fastapi.middleware.cors import CORSMiddleware + from fastapi.staticfiles import StaticFiles + from fastapi.responses import FileResponse, HTMLResponse + from pydantic import BaseModel + import uvicorn + SERVER_AVAILABLE = True +except ImportError: + SERVER_AVAILABLE = False + +# Configuration +OLLAMA_HOST = "localhost" +OLLAMA_PORT = 11434 +DATA_DIR = Path(__file__).parent / "data" +DB_PATH = DATA_DIR / "conversations.db" + +# Validation Config +MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB +MAX_UPLOAD_FILES = 10 +ALLOWED_TYPES = [ + "application/zip", "application/x-zip-compressed", "application/octet-stream", + "text/plain", "text/x-python", "text/javascript", "application/javascript", + "text/html", "text/css", "text/x-c", "text/x-c++src" +] + +# Models +MODELS = { + "fast": "qwen2.5-coder:1.5b", + "balanced": "qwen2.5-coder:3b" +} + +# Complexity triggers - if matched, break down the task +COMPLEX_TRIGGERS = [ + "complete", "entire", "full", "build entire", "build complete", + "with ble and", "with servo and", "including", "all of" +] + +# Module patterns we can detect +MODULE_PATTERNS = { + "ble": ["bluetooth", "ble", "wireless"], + "servo": ["servo", "flipper", "weapon"], + "motor": ["motor", "drive", "movement", "l298n"], + "safety": ["safety", "timeout", "failsafe", "emergency"], + "battery": ["battery", "voltage", "power monitor"], + "sensor": ["sensor", "distance", "proximity"] +} + +# --- Connection Pooling --- +class OllamaConnectionPool: + def __init__(self, host: str, port: int, max_size: int = 10): + self.host = host + self.port = port + self.pool: queue.Queue = queue.Queue(maxsize=max_size) + + def get_connection(self) -> http.client.HTTPConnection: + try: + return self.pool.get_nowait() + except queue.Empty: + return http.client.HTTPConnection(self.host, self.port, timeout=90) + + def return_connection(self, conn: http.client.HTTPConnection): + try: + self.pool.put_nowait(conn) + except queue.Full: + conn.close() + +OLLAMA_POOL = OllamaConnectionPool(OLLAMA_HOST, OLLAMA_PORT) + + +# --- Shadow Suggestion Engine --- +class ShadowSuggestionEngine: + """Proactively suggests modules/settings based on user/project history.""" + def __init__(self, db_path: Path, user_id: str = "default"): + self.db_path = db_path + self.user_id = user_id + + def lookup_recent_module_usage(self, module: str, limit: int = 5) -> List[Tuple[str, str, str]]: + """Look up recent usage patterns for a module from repo_index.""" + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + cursor.execute( + """ + SELECT file_path, content, last_modified FROM repo_index + WHERE (function_name LIKE ? OR file_path LIKE ?) AND user_id = ? + ORDER BY last_modified DESC LIMIT ? + """, + (f"%{module}%", f"%{module}%", self.user_id, limit) + ) + results = cursor.fetchall() + conn.close() + return results + + def suggest_for_module(self, module: str) -> Optional[str]: + """Return a proactive suggestion string for a module if pattern detected.""" + history = self.lookup_recent_module_usage(module) + if not history: + return None + # Example: For 'motor', look for L298N and PWM frequency + l298n_count = 0 + pwm_freqs = [] + for _, content, _ in history: + if "L298N" in content or "l298n" in content: + l298n_count += 1 + pwm_matches = re.findall(r'PWM_FREQ\s*=\s*(\d+)', content) + pwm_freqs.extend([int(f) for f in pwm_matches]) + # Also look for explicit frequency in analogWrite or ledcSetup + freq_matches = re.findall(r'(?:ledcSetup|analogWrite)\s*\([^,]+,\s*[^,]+,\s*(\d+)\)', content) + pwm_freqs.extend([int(f) for f in freq_matches if f.isdigit()]) + if l298n_count >= 2: + freq = max(set(pwm_freqs), key=pwm_freqs.count) if pwm_freqs else 500 + return f"I see you usually use the L298N with a {freq}Hz PWM frequency on the ESP32-C3. Should I prep that module?" + return None + + def get_proactive_suggestion(self, user_input: str) -> Optional[str]: + """ + V3.0 Proactive Hook: + 1. Identify "Concept" (e.g., 'flipper') + 2. Query repo_index for James's most frequent companion modules + 3. If 'flipper' often appears with 'safety_timeout', suggest it. + """ + # 1. Identify Concepts + input_lower = user_input.lower() + detected_modules = [] + for module, keywords in MODULE_PATTERNS.items(): + if any(kw in input_lower for kw in keywords): + detected_modules.append(module) + + if not detected_modules: + return None + + # 2. Query repo_index for correlations + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + suggestions = [] + for module in detected_modules: + # Find files containing this module (simple heuristic) + cursor.execute("SELECT content FROM repo_index WHERE content LIKE ? AND user_id = ? LIMIT 10", (f"%{module}%", self.user_id)) + rows = cursor.fetchall() + if not rows: continue + + # Check for companion modules + companions = {} + for (content,) in rows: + content_lower = content.lower() + for other_mod, other_kws in MODULE_PATTERNS.items(): + if other_mod != module and other_mod not in detected_modules: + if any(kw in content_lower for kw in other_kws): + companions[other_mod] = companions.get(other_mod, 0) + 1 + + # 3. Suggest if frequent (>50% correlation in sample) + for other_mod, count in companions.items(): + if count >= len(rows) * 0.5: + suggestions.append(f"I noticed '{module}' often appears with '{other_mod}' in your repos. Want to include that?") + + conn.close() + return " ".join(list(set(suggestions))) if suggestions else None + + def get_all_suggestions(self, user_input: str, generated_code: str) -> List[str]: + """Aggregate all proactive suggestions into a list.""" + suggestions = [] + + # 1. Companion Modules + companion = self.get_proactive_suggestion(user_input) + if companion: + suggestions.append(companion) + + # 2. Module Settings + input_lower = user_input.lower() + for module, keywords in MODULE_PATTERNS.items(): + if any(kw in input_lower for kw in keywords): + s = self.suggest_for_module(module) + if s: + suggestions.append(s) + + # 3. Forge Theory Check + if ("motor" in input_lower or "servo" in input_lower) and "applyForge" not in generated_code: + suggestions.append("Apply Forge Theory smoothing to movement?") + + # 4. Safety Check (L298N) + if "L298N" in generated_code and "safety" not in generated_code.lower(): + suggestions.append("Drive system lacks safety timeout (GilBot_V2 uses 5s failsafe). Add that?") + + return suggestions + + +class BuddAI: + """Executive with task breakdown""" + + def is_search_query(self, message: str) -> bool: + """Check if this is a search query that should query repo_index""" + message_lower = message.lower() + search_triggers = [ + "show me", "find", "search for", "list all", + "what functions", "which repos", "do i have", + "where did i", "have i used", "examples of", + "show all", "display" + ] + return any(trigger in message_lower for trigger in search_triggers) + + def search_repositories(self, query: str) -> str: + """Search repo_index for relevant functions and code""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + cursor.execute("SELECT COUNT(*) FROM repo_index WHERE user_id = ?", (self.user_id,)) + count = cursor.fetchone()[0] + print(f"\nšŸ” Searching {count} indexed functions...\n") + + # Extract keywords from query + keywords = re.findall(r'\b\w{4,}\b', query.lower()) + # Add specific search terms + specific_terms = [] + if "exponential" in query.lower() or "decay" in query.lower(): + specific_terms.append("applyForge") + specific_terms.append("exp(") + if "forge" in query.lower(): + specific_terms.append("Forge") + keywords.extend(specific_terms) + + if not keywords: + print("āŒ No search terms found") + conn.close() + return "No search terms provided." + + # Build parameterized query + conditions = [] + params = [] + for keyword in keywords: + conditions.append("(function_name LIKE ? OR content LIKE ? OR repo_name LIKE ?)") + params.extend([f"%{keyword}%", f"%{keyword}%", f"%{keyword}%"]) + + sql = f"SELECT repo_name, file_path, function_name, content FROM repo_index WHERE ({' OR '.join(conditions)}) AND user_id = ? ORDER BY last_modified DESC LIMIT 10" + params.append(self.user_id) + + cursor.execute(sql, params) + results = cursor.fetchall() + conn.close() + if not results: + return f"āŒ No functions found matching: {', '.join(keywords)}\n\nTry: /index to index more repositories" + # Format results + output = f"āœ… Found {len(results)} matches for: {', '.join(set(keywords))}\n\n" + for i, (repo, file_path, func, content) in enumerate(results, 1): + # Extract relevant snippet + lines = content.split('\n') + snippet_lines = [] + for line in lines[:30]: # First 30 lines + if any(kw in line.lower() for kw in keywords): + snippet_lines.append(line) + if len(snippet_lines) >= 10: + break + if not snippet_lines: + snippet_lines = lines[:10] + snippet = '\n'.join(snippet_lines) + output += f"**{i}. {func}()** in {repo}\n" + output += f" šŸ“ {Path(file_path).name}\n" + output += f"\n```cpp\n{snippet}\n```\n" + output += f" ---\n\n" + return output + + def __init__(self, user_id: str = "default", server_mode: bool = False): + self.user_id = user_id + self.ensure_data_dir() + self.init_database() + self.session_id = self.create_session() + self.server_mode = server_mode + self.context_messages = [] + self.shadow_engine = ShadowSuggestionEngine(DB_PATH, self.user_id) + + print("BuddAI Executive v3.1 - Modular Builder") + print("=" * 50) + print(f"Session: {self.session_id}") + print(f"FAST (5-10s) | BALANCED (15-30s)") + print(f"Smart task breakdown for complex requests") + print("=" * 50) + print("\nCommands: /fast, /balanced, /help, exit\n") + + def ensure_data_dir(self) -> None: + DATA_DIR.mkdir(exist_ok=True) + + def init_database(self) -> None: + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS sessions ( + session_id TEXT PRIMARY KEY, + user_id TEXT, + started_at TIMESTAMP, + ended_at TIMESTAMP, + title TEXT + ) + """) + + try: + cursor.execute("ALTER TABLE sessions ADD COLUMN title TEXT") + except sqlite3.OperationalError: + pass + + try: + cursor.execute("ALTER TABLE sessions ADD COLUMN user_id TEXT") + except sqlite3.OperationalError: + pass + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT, + role TEXT, + content TEXT, + timestamp TIMESTAMP + ) + """) + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS repo_index ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id TEXT, + file_path TEXT, + repo_name TEXT, + function_name TEXT, + content TEXT, + last_modified TIMESTAMP + ) + """) + + try: + cursor.execute("ALTER TABLE repo_index ADD COLUMN user_id TEXT") + except sqlite3.OperationalError: + pass + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS style_preferences ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id TEXT, + category TEXT, + preference TEXT, + confidence FLOAT, + extracted_at TIMESTAMP + ) + """) + + try: + cursor.execute("ALTER TABLE style_preferences ADD COLUMN user_id TEXT") + except sqlite3.OperationalError: + pass + + conn.commit() + conn.close() + + def create_session(self) -> str: + now = datetime.now() + base_id = now.strftime("%Y%m%d_%H%M%S") + session_id = base_id + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + counter = 0 + while True: + try: + cursor.execute( + "INSERT INTO sessions (session_id, user_id, started_at) VALUES (?, ?, ?)", + (session_id, self.user_id, now.isoformat()) + ) + conn.commit() + break + except sqlite3.IntegrityError: + counter += 1 + session_id = f"{base_id}_{counter}" + + conn.close() + return session_id + + def end_session(self) -> None: + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + cursor.execute( + "UPDATE sessions SET ended_at = ? WHERE session_id = ?", + (datetime.now().isoformat(), self.session_id) + ) + conn.commit() + conn.close() + + def save_message(self, role: str, content: str) -> None: + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + cursor.execute( + "INSERT INTO messages (session_id, role, content, timestamp) VALUES (?, ?, ?, ?)", + (self.session_id, role, content, datetime.now().isoformat()) + ) + conn.commit() + conn.close() + + def index_local_repositories(self, root_path: str) -> None: + """Crawl directories and index .py, .ino, and .cpp files""" + import ast + + print(f"\nšŸ” Indexing repositories in: {root_path}") + path = Path(root_path) + + if not path.exists(): + print(f"āŒ Path not found: {root_path}") + return + + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + count = 0 + + for file_path in path.rglob('*'): + if file_path.is_file() and file_path.suffix in ['.py', '.ino', '.cpp', '.h', '.js', '.jsx', '.html', '.css']: + try: + with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: + content = f.read() + + functions = [] + + # Python parsing + if file_path.suffix == '.py': + try: + tree = ast.parse(content) + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef): + functions.append(node.name) + except: + pass + + # C++/Arduino parsing + elif file_path.suffix in ['.ino', '.cpp', '.h']: + matches = re.findall(r'\b(?:void|int|bool|float|double|String|char)\s+(\w+)\s*\(', content) + functions.extend(matches) + + # JS/Web parsing + elif file_path.suffix in ['.js', '.jsx']: + matches = re.findall(r'(?:function\s+(\w+)|const\s+(\w+)\s*=\s*(?:async\s*)?\(?.*?\)?\s*=>)', content) + functions.extend([m[0] or m[1] for m in matches if m[0] or m[1]]) + + # HTML/CSS - Index as whole file + elif file_path.suffix in ['.html', '.css']: + functions.append("file_content") + + # Determine repo name + try: + repo_name = file_path.relative_to(path).parts[0] + except: + repo_name = "unknown" + + timestamp = datetime.fromtimestamp(file_path.stat().st_mtime) + + for func in functions: + cursor.execute(""" + INSERT INTO repo_index (user_id, file_path, repo_name, function_name, content, last_modified) + VALUES (?, ?, ?, ?, ?, ?) + """, (self.user_id, str(file_path), repo_name, func, content, timestamp.isoformat())) + count += 1 + + except Exception: + pass + + conn.commit() + conn.close() + print(f"āœ… Indexed {count} functions across repositories") + + def retrieve_style_context(self, message: str) -> str: + """Search repo_index for code snippets matching the request""" + # Extract potential keywords (nouns/modules) + keywords = re.findall(r'\b\w{4,}\b', message.lower()) + if not keywords: + return "" + + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + # Build a search query for function names or repo names + search_terms = " OR ".join([f"function_name LIKE '%{k}%'" for k in keywords]) + search_terms += " OR " + " OR ".join([f"repo_name LIKE '%{k}%'" for k in keywords]) + + query = f"SELECT repo_name, function_name, content FROM repo_index WHERE ({search_terms}) AND user_id = ? LIMIT 2" + + cursor.execute(query, (self.user_id,)) + results = cursor.fetchall() + conn.close() + + if not results: + return "" + + context_block = "\n[REFERENCE STYLE FROM JAMES'S PAST PROJECTS]\n" + for repo, func, content in results: + # Just grab the first 500 chars of the file to save context window + snippet = content[:500] + "..." + context_block += f"Repo: {repo} | Function: {func}\nCode:\n{snippet}\n---\n" + + return context_block + + def scan_style_signature(self) -> None: + """V3.0: Analyze repo_index to extract style preferences.""" + print("\nšŸ•µļø Scanning repositories for style signature...") + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + # Get a sample of code + cursor.execute("SELECT content FROM repo_index WHERE user_id = ? ORDER BY RANDOM() LIMIT 5", (self.user_id,)) + rows = cursor.fetchall() + + if not rows: + print("āŒ No code indexed. Run /index first.") + conn.close() + return + + code_sample = "\n---\n".join([r[0][:1000] for r in rows]) + + prompt = f"""Analyze this code sample from James's repositories. + Extract 3 distinct coding preferences or patterns. + Format: Category: Preference + + Examples: + - Serial: Uses 115200 baud + - Safety: Uses non-blocking millis() + - Pins: Prefers #define over const int + + Code Sample: + {code_sample} + """ + + print("⚔ Analyzing with BALANCED model...") + summary = self.call_model("balanced", prompt) + + # Store in DB + timestamp = datetime.now().isoformat() + lines = summary.split('\n') + for line in lines: + if ':' in line: + parts = line.split(':', 1) + category = parts[0].strip('- *') + pref = parts[1].strip() + cursor.execute( + "INSERT INTO style_preferences (user_id, category, preference, confidence, extracted_at) VALUES (?, ?, ?, ?, ?, ?)", + (self.user_id, category, pref, 0.8, timestamp) + ) + + conn.commit() + conn.close() + print(f"\nāœ… Style Signature Updated:\n{summary}\n") + + def is_simple_question(self, message: str) -> bool: + """Check if this is a simple question that should use FAST model""" + message_lower = message.lower() + + simple_triggers = [ + "what is", "what's", "who is", "who's", "when is", + "how do i", "can you explain", "tell me about", + "what are", "where is" + ] + + # Also check if it's just a question without code keywords + code_keywords = ["generate", "create", "write", "build", "code", "function"] + + has_simple_trigger = any(trigger in message_lower for trigger in simple_triggers) + has_code_keyword = any(keyword in message_lower for keyword in code_keywords) + + # Simple if: has simple trigger AND no code keywords + return has_simple_trigger and not has_code_keyword + + def is_complex(self, message: str) -> bool: + """Check if request is too complex and should be broken down""" + message_lower = message.lower() + + # Count complexity triggers + trigger_count = sum(1 for trigger in COMPLEX_TRIGGERS if trigger in message_lower) + + # Count how many modules mentioned + module_count = 0 + for module, keywords in MODULE_PATTERNS.items(): + if any(kw in message_lower for kw in keywords): + module_count += 1 + + # Complex if: multiple triggers OR 3+ modules mentioned + return trigger_count >= 2 or module_count >= 3 + + def extract_modules(self, message: str) -> List[str]: + """Extract which modules are needed""" + message_lower = message.lower() + needed_modules = [] + + for module, keywords in MODULE_PATTERNS.items(): + if any(kw in message_lower for kw in keywords): + needed_modules.append(module) + + return needed_modules + + def build_modular_plan(self, modules: List[str]) -> List[Dict[str, str]]: + """Create a build plan from modules""" + plan = [] + + module_tasks = { + "ble": "BLE communication setup with phone app control", + "servo": "Servo motor control for flipper/weapon", + "motor": "Motor driver setup for movement (L298N)", + "safety": "Safety timeout and failsafe systems", + "battery": "Battery voltage monitoring", + "sensor": "Sensor integration (distance/proximity)" + } + + for module in modules: + if module in module_tasks: + plan.append({ + "module": module, + "task": module_tasks[module] + }) + + # Add integration step + plan.append({ + "module": "integration", + "task": "Integrate all modules into complete system" + }) + + return plan + + def get_user_status(self) -> str: + """Determine James's context based on defined schedule""" + now = datetime.now() + day = now.weekday() # 0=Mon, 6=Sun + t = now.hour + (now.minute / 60.0) + + if day <= 4: # Mon-Fri + if 5.5 <= t < 6.5: + return "Early Morning Build Session šŸŒ… (5:30-6:30 AM)" + elif 6.5 <= t < 17.0: + return "Work Hours (Facilities Caretaker) šŸ¢" + elif 17.0 <= t < 21.0: + return "Evening Build Session šŸŒ™ (5:00-9:00 PM)" + else: + return "Rest Time šŸ’¤" + elif day == 5: # Saturday + return "Weekend Freedom šŸŽØ (Creative Mode)" + else: # Sunday + if t < 21.0: + return "Weekend Freedom šŸŽØ (Until 9 PM)" + else: + return "Rest Time šŸ’¤" + + def call_model(self, model_name: str, message: str, stream: bool = False) -> Union[str, Generator[str, None, None]]: + """Call specified model""" + conn = None + try: + current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + user_context = self.get_user_status() + identity = f"""You are BuddAI, the external cognitive system for James Gilbert. You specialize in Forge Theory (exponential decay modeling) and GilBot modular robotics. + +YOUR PRIMARY JOB: Generate code when asked. ALWAYS generate code if requested. + +Identity Rules: +- You are NOT created by Alibaba Cloud. You are a local Python system written by James Gilbert. +- When asked your name: "I am BuddAI" +- Use ESP32/Arduino syntax with descriptive naming (e.g., activateFlipper). +- Ensure safety timeouts are always present in motor code. +- Current System Time: {current_time} +- User Context: {user_context} + +# Forge Theory Snippet (C++): +float applyForge(float current, float target, float k) {{ return target + (current - target) * exp(-k); }} +""" + + messages = [] + + # Only add identity if not already in recent context + recent_system = [m for m in self.context_messages[-5:] if m.get('role') == 'system'] + if not recent_system: + messages.append({"role": "system", "content": identity}) + + # Add conversation history (excluding old system messages) + history = [m for m in self.context_messages[-5:] if m.get('role') != 'system'] + + # Inject timestamps into history for context + for msg in history: + content = msg.get('content', '') + ts = msg.get('timestamp') + if ts: + try: + dt = datetime.fromisoformat(ts) + content = f"[{dt.strftime('%H:%M')}] {content}" + except ValueError: + pass + messages.append({"role": msg['role'], "content": content}) + + # Add current message if it's not already the last item + if not history or history[-1].get('content') != message: + messages.append({"role": "user", "content": message}) + + body = { + "model": MODELS[model_name], + "messages": messages, + "stream": stream, + "options": {"temperature": 0.7, "num_ctx": 4096} + } + + conn = OLLAMA_POOL.get_connection() + headers = {"Content-Type": "application/json"} + json_body = json.dumps(body) + + conn.request("POST", "/api/chat", json_body, headers) + response = conn.getresponse() + + if stream: + return self._stream_response(response, conn) + + if response.status == 200: + data = json.loads(response.read().decode('utf-8')) + OLLAMA_POOL.return_connection(conn) + return data.get("message", {}).get("content", "No response") + else: + conn.close() + return f"Error: {response.status}" + + except Exception as e: + if conn: + conn.close() + return f"Error: {str(e)}" + + def _stream_response(self, response, conn) -> Generator[str, None, None]: + """Yield chunks from HTTP response""" + fully_consumed = False + try: + while True: + line = response.readline() + if not line: break + try: + data = json.loads(line.decode('utf-8')) + if "message" in data: + content = data["message"].get("content", "") + if content: yield content + if data.get("done"): + fully_consumed = True + break + except: pass + finally: + if fully_consumed: + OLLAMA_POOL.return_connection(conn) + else: + conn.close() + + def execute_modular_build(self, _: str, modules: List[str], plan: List[Dict[str, str]], forge_mode: str = "2") -> str: + """Execute build plan step by step""" + print(f"\nšŸ”Ø MODULAR BUILD MODE") + print(f"Detected {len(modules)} modules: {', '.join(modules)}") + print(f"Breaking into {len(plan)} steps...\n") + + all_code = {} + + for i, step in enumerate(plan, 1): + print(f"šŸ“¦ Step {i}/{len(plan)}: {step['task']}") + print("⚔ Building...\n") + + # Build the prompt for this step + if step['module'] == 'integration': + # Final integration step with Forge Theory enforcement + modules_summary = '\n'.join([f"- {m}: {all_code[m][:150]}..." for m in modules if m in all_code]) + + # Ask James for the 'vibe' of the robot + print("\n⚔ FORGE THEORY TUNING:") + print("1. Aggressive (k=0.3) - High snap, combat ready") + print("2. Balanced (k=0.1) - Standard movement") + print("3. Graceful (k=0.03) - Roasting / Smooth curves") + + if self.server_mode: + choice = forge_mode + else: + choice = input("Select Forge Constant [1-3, default 2]: ") + + k_val = "0.1" + if choice == "1": k_val = "0.3" + elif choice == "3": k_val = "0.03" + + prompt = f"""INTEGRATION TASK: Combine modules into a cohesive GilBot system. + + [MODULES] + {modules_summary} + + [FORGE PARAMETERS] + Set k = {k_val} for all applyForge() calls. + + [REQUIREMENTS] + 1. Implement applyForge() math helper. + 2. Use k={k_val} to smooth motor and servo transitions. + 3. Ensure naming matches James's style: activateFlipper(), setMotors(). + """ + else: + # Individual module + prompt = f"Generate ESP32-C3 code for: {step['task']}. Keep it modular with clear comments." + + # Call balanced model for each module + response = self.call_model("balanced", prompt) + all_code[step['module']] = response + + print(f"āœ… {step['module'].upper()} module complete\n") + print("-" * 50 + "\n") + + # Compile final response + final = "# COMPLETE GILBOT CONTROLLER - MODULAR BUILD\n\n" + for module, code in all_code.items(): + final += f"## {module.upper()} MODULE\n{code}\n\n" + + return final + + def apply_style_signature(self, generated_code: str) -> str: + """Refine generated code to match James's specific naming and safety patterns""" + # 1. Check for James's common function names (e.g., setupMotors vs init_motors) + # 2. Ensure Forge Theory helpers are present if motion is detected + # 3. Append a 'Proactive Note' if a common companion module is missing + + return generated_code + + def _route_request(self, user_message: str, force_model: Optional[str], forge_mode: str) -> str: + """Route the request to the appropriate model or handler.""" + # Determine model based on complexity + if force_model: + model = force_model + print(f"\n⚔ Using {model.upper()} model (forced)...") + return self.call_model(model, user_message) + elif self.is_complex(user_message): + modules = self.extract_modules(user_message) + plan = self.build_modular_plan(modules) + print("\n" + "=" * 50) + print("šŸŽÆ COMPLEX REQUEST DETECTED!") + print(f"Modules needed: {', '.join(modules)}") + print(f"Breaking into {len(plan)} manageable steps") + print("=" * 50) + return self.execute_modular_build(user_message, modules, plan, forge_mode) + elif self.is_search_query(user_message): + # This is a search query - query the database + return self.search_repositories(user_message) + elif self.is_simple_question(user_message): + print("\n⚔ Using FAST model (simple question)...") + return self.call_model("fast", user_message) + else: + print("\nāš–ļø Using BALANCED model...") + return self.call_model("balanced", user_message) + + def chat_stream(self, user_message: str, force_model: Optional[str] = None, forge_mode: str = "2") -> Generator[str, None, None]: + """Streaming version of chat""" + style_context = self.retrieve_style_context(user_message) + if style_context: + self.context_messages.append({"role": "system", "content": style_context}) + + self.save_message("user", user_message) + self.context_messages.append({"role": "user", "content": user_message, "timestamp": datetime.now().isoformat()}) + + full_response = "" + + # Route and stream + if force_model: + iterator = self.call_model(force_model, user_message, stream=True) + elif self.is_complex(user_message): + # Complex builds are not streamed token-by-token in this version + # We yield the final result as one chunk + modules = self.extract_modules(user_message) + plan = self.build_modular_plan(modules) + result = self.execute_modular_build(user_message, modules, plan, forge_mode) + iterator = [result] + elif self.is_search_query(user_message): + result = self.search_repositories(user_message) + iterator = [result] + elif self.is_simple_question(user_message): + iterator = self.call_model("fast", user_message, stream=True) + else: + iterator = self.call_model("balanced", user_message, stream=True) + + for chunk in iterator: + full_response += chunk + yield chunk + + # Suggestions + suggestions = self.shadow_engine.get_all_suggestions(user_message, full_response) + if suggestions: + bar = "\n\nPROACTIVE: > " + " ".join([f"{i+1}. {s}" for i, s in enumerate(suggestions)]) + full_response += bar + yield bar + + self.save_message("assistant", full_response) + self.context_messages.append({"role": "assistant", "content": full_response, "timestamp": datetime.now().isoformat()}) + + # --- Main Chat Method --- + def chat(self, user_message: str, force_model: Optional[str] = None, forge_mode: str = "2") -> str: + """Main chat with smart routing and shadow suggestions""" + style_context = self.retrieve_style_context(user_message) + if style_context: + self.context_messages.append({"role": "system", "content": style_context}) + + + self.save_message("user", user_message) + self.context_messages.append({"role": "user", "content": user_message, "timestamp": datetime.now().isoformat()}) + + # Direct Schedule Check + if "what should i be doing" in user_message.lower() or "my schedule" in user_message.lower() or "schedule check" in user_message.lower(): + status = self.get_user_status() + response = f"šŸ“… **Schedule Check**\nAccording to your protocol, you should be: **{status}**" + print(f"ā° Schedule check triggered: {status}") + self.save_message("assistant", response) + self.context_messages.append({"role": "assistant", "content": response, "timestamp": datetime.now().isoformat()}) + return response + + response = self._route_request(user_message, force_model, forge_mode) + + # Apply Style Guard + response = self.apply_style_signature(response) + + # Generate Suggestion Bar + suggestions = self.shadow_engine.get_all_suggestions(user_message, response) + if suggestions: + bar = "\n\nPROACTIVE: > " + " ".join([f"{i+1}. {s}" for i, s in enumerate(suggestions)]) + response += bar + + self.save_message("assistant", response) + self.context_messages.append({"role": "assistant", "content": response, "timestamp": datetime.now().isoformat()}) + + return response + + def get_sessions(self, limit: int = 20) -> List[Dict[str, str]]: + """Retrieve recent sessions from DB""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + cursor.execute("SELECT session_id, started_at, title FROM sessions WHERE user_id = ? ORDER BY started_at DESC LIMIT ?", (self.user_id, limit)) + rows = cursor.fetchall() + conn.close() + return [{"id": r[0], "date": r[1], "title": r[2] if len(r) > 2 else None} for r in rows] + + def rename_session(self, session_id: str, new_title: str) -> None: + """Rename a session""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + cursor.execute("UPDATE sessions SET title = ? WHERE session_id = ? AND user_id = ?", (new_title, session_id, self.user_id)) + conn.commit() + conn.close() + + def delete_session(self, session_id: str) -> None: + """Delete a session and its messages""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + cursor.execute("DELETE FROM sessions WHERE session_id = ? AND user_id = ?", (session_id, self.user_id)) + if cursor.rowcount > 0: + cursor.execute("DELETE FROM messages WHERE session_id = ?", (session_id,)) + conn.commit() + conn.close() + + def load_session(self, session_id: str) -> List[Dict[str, str]]: + """Load a specific session context""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + cursor.execute("SELECT 1 FROM sessions WHERE session_id = ? AND user_id = ?", (session_id, self.user_id)) + if not cursor.fetchone(): + conn.close() + return [] + + cursor.execute("SELECT role, content, timestamp FROM messages WHERE session_id = ? ORDER BY id ASC", (session_id,)) + rows = cursor.fetchall() + conn.close() + + self.session_id = session_id + self.context_messages = [] + loaded_history = [] + for role, content, ts in rows: + msg = {"role": role, "content": content, "timestamp": ts} + self.context_messages.append(msg) + loaded_history.append(msg) + return loaded_history + + def start_new_session(self) -> str: + """Reset context and start new session""" + self.session_id = self.create_session() + self.context_messages = [] + return self.session_id + + def run(self) -> None: + """Main loop""" + try: + force_model = None + while True: + user_input = input("\nJames: ").strip() + if not user_input: + continue + if user_input.lower() in ['exit', 'quit']: + print("\nšŸ‘‹ Later!") + self.end_session() + break + if user_input.startswith('/'): + cmd = user_input.lower() + if cmd == '/fast': + force_model = "fast" + print("⚔ Next: FAST model") + continue + elif cmd == '/balanced': + force_model = "balanced" + print("āš–ļø Next: BALANCED model") + continue + elif cmd == '/help': + print("\nšŸ’” Commands:") + print("/fast - Use fast model") + print("/balanced - Use balanced model") + print("/index - Index local repositories") + print("/scan - Scan style signature (V3.0)") + print("/help - This message") + print("exit - End session\n") + continue + elif cmd.startswith('/index'): + parts = user_input.split(maxsplit=1) + if len(parts) > 1: + self.index_local_repositories(parts[1]) + else: + print("Usage: /index ") + continue + elif cmd == '/scan': + self.scan_style_signature() + continue + else: + print("\nUnknown command. Type /help") + continue + # Chat + response = self.chat(user_input, force_model) + print(f"\nBuddAI:\n{response}\n") + force_model = None + except KeyboardInterrupt: + print("\n\nšŸ‘‹ Bye!") + self.end_session() + + +# --- Server Implementation --- +if SERVER_AVAILABLE: + app = FastAPI(title="BuddAI API", version="3.1") + app = FastAPI(title="BuddAI API", version="3.2") + + # Allow React frontend to communicate + app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["*"], + allow_headers=["*"], + ) + + class ChatRequest(BaseModel): + message: str + model: Optional[str] = None + forge_mode: Optional[str] = "2" + + class SessionLoadRequest(BaseModel): + session_id: str + + class SessionRenameRequest(BaseModel): + session_id: str + title: str + + class SessionDeleteRequest(BaseModel): + session_id: str + + # Multi-user support + class BuddAIManager: + def __init__(self): + self.instances: Dict[str, BuddAI] = {} + + def get_instance(self, user_id: str) -> BuddAI: + if user_id not in self.instances: + self.instances[user_id] = BuddAI(user_id=user_id, server_mode=True) + return self.instances[user_id] + + buddai_manager = BuddAIManager() + + # Serve Frontend + frontend_path = Path(__file__).parent / "frontend" + frontend_path.mkdir(exist_ok=True) + app.mount("/web", StaticFiles(directory=frontend_path, html=True), name="web") + + @app.get("/", response_class=HTMLResponse) + async def root(): + server_buddai = buddai_manager.get_instance("default") + status = server_buddai.get_user_status() + return f""" + + + BuddAI API + + + + + BuddAI +

BuddAI API Online

+

Current Mode: {status}

+

Visit /web or /docs

+ + + """ + + @app.get("/favicon.ico", include_in_schema=False) + async def favicon(): + return FileResponse(Path(__file__).parent / "icons" / "icon.png") + + @app.get("/favicon-16x16.png", include_in_schema=False) + async def favicon_16(): + return FileResponse(Path(__file__).parent / "icons" / "favicon-16x16.png") + + @app.get("/favicon-32x32.png", include_in_schema=False) + async def favicon_32(): + return FileResponse(Path(__file__).parent / "icons" / "favicon-32x32.png") + + @app.get("/favicon-192x192.png", include_in_schema=False) + async def favicon_192(): + return FileResponse(Path(__file__).parent / "icons" / "favicon-192x192.png") + + def validate_upload(file: UploadFile) -> bool: + # Check size + file.file.seek(0, 2) + size = file.file.tell() + file.file.seek(0) + + if size > MAX_FILE_SIZE: + raise ValueError(f"File too large (Limit: {MAX_FILE_SIZE//1024//1024}MB)") + + # Magic number check for ZIPs + if file.filename.endswith('.zip'): + header = file.file.read(4) + file.file.seek(0) + if header != b'PK\x03\x04': + raise ValueError("Invalid ZIP file header") + + if file.content_type not in ALLOWED_TYPES: + # Fallback: check extension if content_type is generic + ext = Path(file.filename).suffix.lower() + if ext not in ['.zip', '.py', '.ino', '.cpp', '.h', '.js', '.jsx', '.html', '.css']: + raise ValueError("Invalid file type") + # Scan for malicious content + return True + + def sanitize_filename(filename: str) -> str: + clean = re.sub(r'[^a-zA-Z0-9_.-]', '_', filename) + return clean if clean else "upload.bin" + + def safe_extract_zip(zip_path: Path, extract_path: Path): + """Extract zip file with Zip Slip protection""" + with zipfile.ZipFile(zip_path, 'r') as zip_ref: + for member in zip_ref.infolist(): + target_path = extract_path / member.filename + # Resolve paths to ensure they stay within extract_path + if not str(target_path.resolve()).startswith(str(extract_path.resolve())): + raise ValueError(f"Malicious zip member: {member.filename}") + zip_ref.extractall(extract_path) + + @app.post("/api/chat") + async def chat_endpoint(request: ChatRequest, user_id: str = Header("default")): + server_buddai = buddai_manager.get_instance(user_id) + response = server_buddai.chat(request.message, force_model=request.model, forge_mode=request.forge_mode) + return {"response": response} + + @app.websocket("/api/ws/chat") + async def websocket_endpoint(websocket: WebSocket): + await websocket.accept() + try: + while True: + data = await websocket.receive_json() + user_message = data.get("message") + user_id = data.get("user_id", "default") + model = data.get("model") + forge_mode = data.get("forge_mode", "2") + + server_buddai = buddai_manager.get_instance(user_id) + + for chunk in server_buddai.chat_stream(user_message, model, forge_mode): + await websocket.send_json({"type": "token", "content": chunk}) + + await websocket.send_json({"type": "end"}) + except WebSocketDisconnect: + pass + + @app.get("/api/history") + async def history_endpoint(user_id: str = Header("default")): + server_buddai = buddai_manager.get_instance(user_id) + return {"history": server_buddai.context_messages} + + @app.get("/api/sessions") + async def sessions_endpoint(user_id: str = Header("default")): + server_buddai = buddai_manager.get_instance(user_id) + return {"sessions": server_buddai.get_sessions()} + + @app.post("/api/session/load") + async def load_session_endpoint(req: SessionLoadRequest, user_id: str = Header("default")): + server_buddai = buddai_manager.get_instance(user_id) + history = server_buddai.load_session(req.session_id) + return {"history": history, "session_id": req.session_id} + + @app.post("/api/session/rename") + async def rename_session_endpoint(req: SessionRenameRequest, user_id: str = Header("default")): + server_buddai = buddai_manager.get_instance(user_id) + server_buddai.rename_session(req.session_id, req.title) + return {"status": "success"} + + @app.post("/api/session/delete") + async def delete_session_endpoint(req: SessionDeleteRequest, user_id: str = Header("default")): + server_buddai = buddai_manager.get_instance(user_id) + server_buddai.delete_session(req.session_id) + return {"status": "success"} + + @app.post("/api/session/new") + async def new_session_endpoint(user_id: str = Header("default")): + server_buddai = buddai_manager.get_instance(user_id) + new_id = server_buddai.start_new_session() + return {"session_id": new_id} + + @app.post("/api/upload") + async def upload_repo(file: UploadFile = File(...), user_id: str = Header("default")): + server_buddai = buddai_manager.get_instance(user_id) + try: + validate_upload(file) + + uploads_dir = DATA_DIR / "uploads" + uploads_dir.mkdir(exist_ok=True) + + # Enforce MAX_UPLOAD_FILES (Hardening) + existing_items = sorted(uploads_dir.iterdir(), key=lambda p: p.stat().st_mtime) + while len(existing_items) >= MAX_UPLOAD_FILES: + oldest = existing_items.pop(0) + if oldest.is_dir(): + shutil.rmtree(oldest) + else: + oldest.unlink() + + safe_name = sanitize_filename(file.filename) + file_location = uploads_dir / safe_name + with open(file_location, "wb") as buffer: + shutil.copyfileobj(file.file, buffer) + + if safe_name.endswith(".zip"): + extract_path = uploads_dir / file_location.stem + extract_path.mkdir(exist_ok=True) + safe_extract_zip(file_location, extract_path) + server_buddai.index_local_repositories(extract_path) + file_location.unlink() # Cleanup zip + return {"message": f"āœ… Successfully indexed {safe_name}"} + else: + # Support single code files by moving them to a folder and indexing + if file_location.suffix in ['.py', '.ino', '.cpp', '.h', '.js', '.jsx', '.html', '.css']: + target_dir = uploads_dir / file_location.stem + target_dir.mkdir(exist_ok=True) + final_path = target_dir / safe_name + shutil.move(str(file_location), str(final_path)) + server_buddai.index_local_repositories(target_dir) + return {"message": f"āœ… Successfully indexed {safe_name}"} + + return {"message": f"āœ… Successfully uploaded {safe_name}"} + except Exception as e: + return {"message": f"āŒ Error: {str(e)}"} + +def check_ollama() -> bool: + try: + conn = http.client.HTTPConnection(OLLAMA_HOST, OLLAMA_PORT, timeout=5) + conn.request("GET", "/api/tags") + response = conn.getresponse() + conn.close() + return response.status == 200 + except: + return False + + +def main() -> None: + if not check_ollama(): + print("āŒ Ollama not running. Start: ollama serve") + sys.exit(1) + + if len(sys.argv) > 1 and sys.argv[1] == "--server": + if SERVER_AVAILABLE: + print("šŸš€ Starting BuddAI API Server on port 8000...") + uvicorn.run(app, host="0.0.0.0", port=8000) + else: + print("āŒ Server dependencies missing. Install: pip install fastapi uvicorn aiofiles python-multipart") + else: + buddai = BuddAI() + buddai.run() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/data/conversations.db b/data/conversations.db index c3fb93a728cc3ed81e4e930c11d16afcdd5e9917..ce730951e2a88366d91dd6051e785988cc1b6a47 100644 GIT binary patch delta 5214 zcmaLX2VB(kAIEWy!@C3SP+W+J0*ZpbalaeD;odn?OI)RqZ!Rz_>aeTjz&;7>Tuwe_z4^z{k!nXzlc!vi3 z`x7!f*;8q2fOAZu!*(zr*s>tNy0`ci$6!aiy+mkY=n>0;;2Ueg>sn$hAtTak)`-|x z)2Q@(TS$y4GC5|@fNuS>dv%S;YSbYsafI1qjfjpmjqI3r!FsM#q5p>MGezegwMCjj zMkEDWBT`cXN15|a+RB*1?p>Il5V}9e?6gGZSxYpCv@Np@w$-+UShri}Tf16amTQ)c zmI;=2mJQ*p9VWYJO-x_`Ghfi$7BgcZ%DeW%x3^ z?o6-C={5X*x4Ub%?p+4;8R$;2rh4w#=t^_xC)-zGt1B(j<pN%xuNEwHulwCaVYX_(iQR$zybt7+a$SDMrCyZtWD-D!#U zndT|5xA3%tBBpta^a49LHBY9I?)2(P8MiKEE?2VJeWtk!>@7U4>iKSgvmPo%?3|pJ`Q!nCAA}b0sy+o#{z)`n~Bsclw8fl^D(WHm z@2F<}tiop&SHvurr_clGc4c~8PLC(Umv;B2Iywx8)qcR9Zy#^Z-VlDl2;5=i6OkB{my;5fd+Q_M zEMrbJ@lJ1jEbcztDW%Lw|Gs+r@l7sjr8lYYN}D%uiZYS%J;lx=n?V+y5dIWq>Rl;F{wahRza$mvM8&vsbCeNLX}-P zRG2ERN~myEQk7DrRT))QMW}KrQk7Rxs)CAE6;&k_qheK@s;sK0cvV$ZQwb_jRaZ$W zS@E8oDpk3Zq1?)&yvnE2RJzJgewC?esG6#ls;%m%x~iV4uNtU^s*!4}ny3dWzSQf*Z`)n0W_9n~Z1QPoLxR*$J{^|F~Oj_RxWsi)O5s=peb2C6~oSv6PSZ-k%~G$ZSJiAaN6l69)O_`tTA&uHMe23+ zhI&&iR!h`dYN=YL-d69ZRt7o%2)5J57dWhjrvG^tUgg|)jG9aeX2I7 z&(!DY3$;;gQk&J6YK!_xeXX{tZ`8NyJN3Q#L2Xmp)ef~&?NYnd9`&QztA0{Ht9@#} zI-m}!L+Y?PqK>Lx)UWE8I<8Krlj@W@tk>YVybomUssMRiGCR==w&>Z-b?{!rJ| zpX!GCOZ~0>Q8y>*vp-cqAebQtih%{JV1r-?fl#o61Hzy(jXl&zz>;F z18PDos10?XF4Tki&;S}jBWMgw-~ng~&7e87fCr%^w1U>~5IhWRAPd?;J7^Cbpd&m2 zk3uKt439xJJPuvp33w8^LO19RJ)kG_g5K~H^no1c3;p0}cn12z02l~^;8_?9L*O}h z9$tW<@FKhfxiAcd!w47&qhK_Qfw3?S#=``d2$NtkOo6E|4W`2k$b*+*Cd`6Y;8mCn zb6_sagZc0pEP#cu2wsOb;7wQzOW-Y73d`Vacn6ll3RnrNU^To8??FDi4&1-^o>VJmzC-@O%u)2#ugIG=T@8DKvxT&;lNWme2}X!$a^ew1F&W3+R66C@#7!D&~B#eU5Fb2lLI2aETU?NO{$uI?`!Zer;GawIMhM6!6 zUV&F(Hq3#!Fc0R#Yp?(o!XkJb-hek@F)V?%U@0tvx8WUF4l7_Ktb*0>F1!c%@IHJ1 zAHo{=2tI~SU@feJ_3$ZdfY0D__yRV9D$?o3;YVl;5eLslW+=7!x=aW=ioOu4;SDfT!PE+ zJ6wUQa1H)|>+mPsfWP2x_y=wV>g&&}0}}*-8G@h~SilN42!;>{1v@w(42nYu2#1nT z3Q9v6C<_r#4kDpEL_r0JhKf)LVjvdcpfXf}c&G~1AORAgIwV0d+&V!gN2%Zf1Ki*N pFZdu0(jf!2pdmDZ#^w$B`ZMRv_z%3aZ!rJ> delta 4143 zcmYM!RZx{%7>98-wW&=Rv}{lml!GGJq9|gw*xiYVT?lq7cDG_HCI)tQcVKsSo&V#U zne)v&zj-g##kVfjo0z!JJ5f#caZ9(bh#X|`|7V;M8sVRA5jHZ!$<;B$@xJ|UyE_?fHGK^oj1I$k8;l-)eum_zlt<3@-9pU?9TU2=HOIv^hzm*b zvp1$?+IbmFW^+hVu$|E(Aiyx(CoR?1Y)A`syk%__V3(OZ|&x{%U|4s0OLQ zYKR)D64fx3q>|NeHA0P4qts|MMvYbD)Oa;PO;nTAWHm)iRViwknyzN3nQE4rt>&n? zYMz>}7N~`4ky@;lsHJL|TCP^8m1>nrRjbt+wN|ZD>(vIeQEgJ2RhrtOwyJGvyV{|4 zs&utW?N)o#UX`Ktsr~AJI;ak*!|I4Ss*b7S>V!I}PN~!Cj5@2%sq^ZBx~MLx%j$}{ zs;;T)>V~?hZmHYqj=HPvsrxEZJx~wTBlTE4QBTz~^<2GBFV!pcTD?(k)jRcGeNZ3O zC-qrI0O3#&A|Mh9 zL18EYMWGlJhZ0Z{qM#I%hG-}QWg!O2K`fMq3Q!R$L1m}{RiPSGhZ;~5YC&zN194Cn z>Op;I01crLG=?V76q-SEXaOyu6|{yn&=%T3d*}cip%Zk5E)Wl0p&N9E9?%ndK?3xK zKF}BXL4Ozk17Q#hh9NK%5@8r5K{5=75ik-)!Dtu*V__VOhY2tdCc$Kw0#hLcronWW z0W)D1%!WBI7v{lySO5!Q5iEu!uoRZTa##T?VHKppYFGnnVI8c84X_b5!DdKb+`dH;TGJ6J8&27!F|Yt2k;Oc!DDy=PvIFnhZpb?Ucqa418?CSyoV3)5kA3Z z_yS+y8+?Z!@DqN)Z}?--`m@wv086j}Yp?-ZFoGS}g9A8%6PUmmT)-9Fz#XzcR>%g~ z!2>+O3%tPxe8CTLfIpZa00JQ=*U@pvq`LF;M!Xj7CSsh1IYI*1|ei4;x@3Y=X^@23ue&Y=iBv z19n0>?1J5}2lhe+?1TMq01m<-I1ESNC>(?1Z~{)kDL4&h;4GYj^Kbz!!X>y2SKumK zgX?euZo)0N4R_!!+=Kg&2@l{QJc7sY1fIe(cn&Y%CA@;y@CM$(J9rNt;3Is3&+rAl z!Z-L1Kj0_)g5U7RQtQu3g8?kT3ar5fY{3Y2U=I%92u@%EXK(>ma07S90$Cv&WCsuM z1TXLgAMgb~$N~Ofh5!hJoRABGAUEWJU { endRef.current?.scrollIntoView({ behavior: "smooth" }); @@ -266,6 +267,17 @@ return () => clearInterval(timer); }, []); + useEffect(() => { + // Initialize WebSocket + const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; + const wsUrl = `${protocol}//${window.location.host}/api/ws/chat`; + socketRef.current = new WebSocket(wsUrl); + + return () => { + if (socketRef.current) socketRef.current.close(); + }; + }, []); + const handleRename = async (e) => { if (e.key === 'Enter') { await fetch("/api/session/rename", { @@ -327,30 +339,49 @@ if (!textOverride) setInput(""); setLoading(true); - // Cancel previous request if any - if (abortControllerRef.current) abortControllerRef.current.abort(); - const controller = new AbortController(); - abortControllerRef.current = controller; + // Use WebSocket if available + if (socketRef.current && socketRef.current.readyState === WebSocket.OPEN) { + // Add placeholder for streaming response + setHistory(prev => [...prev, { role: "assistant", content: "" }]); + + socketRef.current.send(JSON.stringify({ + message: msgText, + forge_mode: forgeMode, + user_id: "default" // In a real app, get from auth context + })); - try { - const res = await fetch("/api/chat", { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ message: msgText, forge_mode: forgeMode }), - signal: controller.signal - }); - const data = await res.json(); - setHistory(prev => [...prev, { role: "assistant", content: data.response }]); - if (!currentSessionId) fetchSessions(); // Refresh list if this was first msg - } catch (err) { - if (err.name === 'AbortError') { - setHistory(prev => [...prev, { role: "assistant", content: "šŸ›‘ *Generation stopped by user.*" }]); - } else { + socketRef.current.onmessage = (event) => { + const data = JSON.parse(event.data); + if (data.type === 'token') { + setHistory(prev => { + const newHistory = [...prev]; + const lastMsg = newHistory[newHistory.length - 1]; + if (lastMsg.role === 'assistant') { + lastMsg.content += data.content; + } + return newHistory; + }); + } else if (data.type === 'end') { + setLoading(false); + if (!currentSessionId) fetchSessions(); + } + }; + } else { + // Fallback to HTTP + try { + const res = await fetch("/api/chat", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ message: msgText, forge_mode: forgeMode }) + }); + const data = await res.json(); + setHistory(prev => [...prev, { role: "assistant", content: data.response }]); + if (!currentSessionId) fetchSessions(); + } catch (err) { setHistory(prev => [...prev, { role: "assistant", content: "Error connecting to BuddAI server." }]); } + setLoading(false); } - setLoading(false); - abortControllerRef.current = null; }; const stopGeneration = () => { @@ -394,7 +425,7 @@
BuddAI -

BuddAI v3

+

BuddAI v3.2

{status}
diff --git a/tests/test_buddai.py b/tests/test_buddai.py index 8f87987..1743c3f 100644 --- a/tests/test_buddai.py +++ b/tests/test_buddai.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 """ -BuddAI v3.1 Test Suite +BuddAI v3.2 Test Suite Comprehensive testing for all features Author: James Gilbert @@ -8,11 +8,25 @@ License: MIT """ import sys +import importlib.util +from unittest.mock import MagicMock, patch import sqlite3 import tempfile import shutil from pathlib import Path from datetime import datetime +import os +import io +import zipfile + +# Dynamic import of buddai_v3.2.py +REPO_ROOT = Path(__file__).parent.parent +MODULE_PATH = REPO_ROOT / "buddai_v3.2.py" +spec = importlib.util.spec_from_file_location("buddai_v3_2", MODULE_PATH) +buddai_module = importlib.util.module_from_spec(spec) +sys.modules["buddai_v3_2"] = buddai_module +spec.loader.exec_module(buddai_module) +BuddAI = buddai_module.BuddAI # Test utilities class TestColors: @@ -544,10 +558,318 @@ def test_context_window(): return False +# Test 12: Schedule Awareness (New) +def test_schedule_awareness(): + print_test("Schedule Awareness") + + # Mock datetime to test different times + with patch('buddai_v3_2.datetime') as mock_date: + # 1. Early Morning (Monday 6:00 AM) + mock_date.now.return_value = datetime(2025, 12, 29, 6, 0, 0) + + buddai = BuddAI(server_mode=False) + status = buddai.get_user_status() + + if "Early Morning" in status: + print_pass(f"6:00 AM Mon -> {status}") + else: + print_fail(f"Failed Morning check: {status}") + return False + + # 2. Work Hours (Monday 10:00 AM) + mock_date.now.return_value = datetime(2025, 12, 29, 10, 0, 0) + status = buddai.get_user_status() + + if "Work Hours" in status: + print_pass(f"10:00 AM Mon -> {status}") + else: + print_fail(f"Failed Work check: {status}") + return False + + return True + + +# Test 13: Modular Plan Generation (New) +def test_modular_plan(): + print_test("Modular Plan Generation") + + buddai = BuddAI(server_mode=False) + modules = ["ble", "servo"] + plan = buddai.build_modular_plan(modules) + + # Expect 3 steps: ble, servo, integration + if len(plan) == 3: + tasks = [p['module'] for p in plan] + if "integration" in tasks and "ble" in tasks: + print_pass(f"Generated {len(plan)} steps including Integration") + return True + + print_fail(f"Plan generation failed. Got {len(plan)} steps: {plan}") + return False + + +# Test 14: Session Management (New) +def test_session_management(): + print_test("Session Management (CRUD)") + + # Use a named temporary file to handle Windows file locking better + fd, test_db_path = tempfile.mkstemp(suffix=".db") + os.close(fd) + test_db = Path(test_db_path) + + try: + with patch('buddai_v3_2.DB_PATH', test_db): + buddai = BuddAI(server_mode=False) + + # 1. Create + sid = buddai.start_new_session() + print_pass(f"Created session: {sid}") + + # 2. Rename + buddai.rename_session(sid, "Unit Test Session") + sessions = buddai.get_sessions(limit=1) + if not sessions or sessions[0]['title'] != "Unit Test Session": + print_fail("Rename failed") + return False + print_pass("Renamed session successfully") + + # 3. Delete + buddai.delete_session(sid) + sessions = buddai.get_sessions(limit=5) + if any(s['id'] == sid for s in sessions): + print_fail("Delete failed - session still exists") + return False + print_pass("Deleted session successfully") + finally: + # Manual cleanup with error suppression for Windows locks + try: + if test_db.exists(): + os.unlink(test_db) + except Exception: + pass + + return True + + +# Test 15: Rapid Session Creation (Collision Handling) +def test_rapid_session_creation(): + print_test("Rapid Session Creation (Collision Handling)") + + # Use a named temporary file to handle Windows file locking better + fd, test_db_path = tempfile.mkstemp(suffix=".db") + os.close(fd) + test_db = Path(test_db_path) + + try: + # Mock datetime to return a fixed time, forcing ID collisions + fixed_time = datetime(2025, 1, 1, 12, 0, 0) + + with patch('buddai_v3_2.DB_PATH', test_db): + with patch('buddai_v3_2.datetime') as mock_dt: + mock_dt.now.return_value = fixed_time + + buddai = BuddAI(server_mode=False) + + ids = [buddai.session_id] # Capture session from __init__ + + # Create 4 more sessions rapidly + for _ in range(4): + ids.append(buddai.start_new_session()) + + # Verify format + base_id = fixed_time.strftime("%Y%m%d_%H%M%S") + expected = [base_id] + [f"{base_id}_{i}" for i in range(1, 5)] + + if ids == expected: + print_pass(f"Generated unique IDs with suffixes: {ids}") + return True + else: + print_fail(f"Unexpected ID format. Expected {expected}, got {ids}") + return False + finally: + try: + if test_db.exists(): + os.unlink(test_db) + except Exception: + pass + +# Test 16: Repository Isolation (Multi-User) +def test_repo_isolation(): + print_test("Repository Isolation (Multi-User)") + + # Use a named temporary file for DB + fd, test_db_path = tempfile.mkstemp(suffix=".db") + os.close(fd) + test_db = Path(test_db_path) + + # Create a temp directory for repo + with tempfile.TemporaryDirectory() as tmp_repo: + repo_path = Path(tmp_repo) + + # Create a unique file for User 1 + (repo_path / "user1_secret.py").write_text("def user1_secret_function():\n pass") + + try: + with patch('buddai_v3_2.DB_PATH', test_db): + # Suppress internal prints to keep test output clean + with patch('builtins.print'): + # User 1 indexes the repo + buddai1 = BuddAI(user_id="user1", server_mode=False) + buddai1.index_local_repositories(str(repo_path)) + + # User 2 instance + buddai2 = BuddAI(user_id="user2", server_mode=False) + + # User 1 searches + res1 = buddai1.search_repositories("user1_secret_function") + + # User 2 searches + res2 = buddai2.search_repositories("user1_secret_function") + + # Verify User 1 found it + if "Found 1 matches" in res1 or "user1_secret_function" in res1: + print_pass("User 1 found their indexed code") + else: + print_fail(f"User 1 failed to find code: {res1}") + return False + + # Verify User 2 did NOT find it + if "No functions found" in res2: + print_pass("User 2 could not see User 1's code") + else: + print_fail(f"User 2 saw restricted code: {res2}") + return False + + finally: + try: + if test_db.exists(): + os.unlink(test_db) + except Exception: + pass + + return True + +# Test 17: Upload Security (Hardening) +def test_upload_security(): + print_test("Upload Security (Hardening)") + + # 1. Test Magic Byte Check + # We need to mock UploadFile since it's a FastAPI class + class MockUploadFile: + def __init__(self, filename, content): + self.filename = filename + self.file = io.BytesIO(content) + self.content_type = "application/zip" + + if hasattr(buddai_module, 'validate_upload'): + # Create a fake zip (text file renamed) + fake_zip = MockUploadFile("fake.zip", b"This is not a zip file") + try: + buddai_module.validate_upload(fake_zip) + print_fail("Magic byte check failed (accepted invalid zip)") + return False + except ValueError as e: + if "Invalid ZIP file header" in str(e): + print_pass("Magic byte check rejected invalid zip header") + else: + print_fail(f"Unexpected error: {e}") + return False + else: + print_warn("Skipping magic byte check (validate_upload not available)") + + # 2. Test Zip Slip Protection + if hasattr(buddai_module, 'safe_extract_zip'): + with tempfile.TemporaryDirectory() as tmpdir: + malicious_zip_path = Path(tmpdir) / "slip.zip" + extract_dir = Path(tmpdir) / "extract" + extract_dir.mkdir() + + # Create a zip file with a member pointing to parent directory + # We use zipfile to craft this manually + zip_buffer = io.BytesIO() + with zipfile.ZipFile(zip_buffer, 'w') as zf: + zf.writestr('../evil.txt', 'malicious content') + + malicious_zip_path.write_bytes(zip_buffer.getvalue()) + + try: + buddai_module.safe_extract_zip(malicious_zip_path, extract_dir) + print_fail("Zip Slip protection failed (extracted malicious file)") + return False + except ValueError as e: + if "Malicious zip member" in str(e): + print_pass("Zip Slip protection caught directory traversal") + else: + print_fail(f"Unexpected error during extraction: {e}") + return False + return True + +# Test 18: WebSocket Logic (Streaming) +def test_websocket_logic(): + print_test("WebSocket Logic (Streaming)") + + # Use a named temporary file for DB + fd, test_db_path = tempfile.mkstemp(suffix=".db") + os.close(fd) + test_db = Path(test_db_path) + + try: + with patch('buddai_v3_2.DB_PATH', test_db): + # Suppress prints during init + with patch('builtins.print'): + buddai = BuddAI(server_mode=False) + + # Mock call_model to return a generator + def mock_generator(*args, **kwargs): + yield "Stream" + yield "ing" + yield "..." + + with patch.object(buddai, 'call_model', side_effect=mock_generator) as mock_call: + # Mock shadow engine to avoid DB lookups or side effects affecting output + with patch.object(buddai.shadow_engine, 'get_all_suggestions', return_value=[]): + + # Execute + stream = buddai.chat_stream("Test Message", force_model="fast") + chunks = list(stream) + full_text = "".join(chunks) + + # Verify 1: Content + if full_text == "Streaming...": + print_pass("Streamed content matches expected output") + else: + print_fail(f"Stream content mismatch. Got: '{full_text}'") + return False + + # Verify 2: Stream flag passed to model + args, kwargs = mock_call.call_args + if kwargs.get('stream') is True: + print_pass("call_model invoked with stream=True") + else: + print_fail(f"call_model arguments incorrect: {kwargs}") + return False + + # Verify 3: Context saved + last_msg = buddai.context_messages[-1] + if last_msg['role'] == 'assistant' and last_msg['content'] == "Streaming...": + print_pass("Conversation context updated correctly") + else: + print_fail("Context update failed") + return False + + finally: + try: + if test_db.exists(): + os.unlink(test_db) + except Exception: + pass + + return True + # Main Test Runner def run_all_tests(): print("\n" + "="*60) - print("šŸ”„ BuddAI v3.1 Comprehensive Test Suite") + print("šŸ”„ BuddAI v3.2 Comprehensive Test Suite") print("="*60) tests = [ @@ -562,6 +884,13 @@ def run_all_tests(): ("Repository Indexing", test_repository_indexing), ("Search Query Safety", test_search_query_safety), ("Context Window", test_context_window), + ("Schedule Awareness", test_schedule_awareness), + ("Modular Plan", test_modular_plan), + ("Session Management", test_session_management), + ("Rapid Session Creation", test_rapid_session_creation), + ("Repository Isolation", test_repo_isolation), + ("Upload Security", test_upload_security), + ("WebSocket Logic", test_websocket_logic), ] results = [] diff --git a/tests/test_buddai_v3_2.py b/tests/test_buddai_v3_2.py new file mode 100644 index 0000000..eee32df --- /dev/null +++ b/tests/test_buddai_v3_2.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python3 +""" +Unit tests for BuddAI v3.2 +Verifies type hints and core functionality including the new routing logic. +""" + +import unittest +import sys +import importlib.util +from pathlib import Path +from typing import List, Dict, Optional +from unittest.mock import MagicMock, patch + +# Load buddai_v3.2.py dynamically due to version number in filename +REPO_ROOT = Path(__file__).parent.parent +MODULE_PATH = REPO_ROOT / "buddai_v3.2.py" + +if not MODULE_PATH.exists(): + print(f"Error: Could not find {MODULE_PATH}") + sys.exit(1) + +spec = importlib.util.spec_from_file_location("buddai_v3_2", MODULE_PATH) +buddai_module = importlib.util.module_from_spec(spec) +sys.modules["buddai_v3_2"] = buddai_module +spec.loader.exec_module(buddai_module) + +BuddAI = buddai_module.BuddAI + +class TestBuddAITypesAndLogic(unittest.TestCase): + + def setUp(self): + # Suppress print statements during tests + self.original_stdout = sys.stdout + sys.stdout = MagicMock() + + # Initialize BuddAI in non-server mode, mocking DB interactions + with patch('sqlite3.connect') as mock_sql: + # Mock mkdir to prevent creating directories during tests + with patch('pathlib.Path.mkdir'): + self.buddai = BuddAI(server_mode=False) + self.mock_conn = mock_sql.return_value + self.mock_cursor = self.mock_conn.cursor.return_value + + def tearDown(self): + sys.stdout = self.original_stdout + + def test_method_annotations(self): + """Verify type hints exist on key methods""" + # chat + chat_hints = BuddAI.chat.__annotations__ + self.assertEqual(chat_hints['user_message'], str) + self.assertEqual(chat_hints['return'], str) + + # is_complex + self.assertEqual(BuddAI.is_complex.__annotations__['return'], bool) + + # extract_modules + self.assertEqual(BuddAI.extract_modules.__annotations__['return'], List[str]) + + # build_modular_plan + self.assertEqual(BuddAI.build_modular_plan.__annotations__['return'], List[Dict[str, str]]) + + def test_routing_simple_question(self): + """Test that simple questions route to the FAST model""" + with patch.object(self.buddai, 'call_model', return_value="Fast response") as mock_call: + response = self.buddai._route_request("What is a servo?", force_model=None, forge_mode="2") + + mock_call.assert_called_with("fast", "What is a servo?") + self.assertEqual(response, "Fast response") + + def test_routing_complex_request(self): + """Test that complex requests route to modular build""" + complex_msg = "Build a complete robot with servo and motor" + + with patch.object(self.buddai, 'execute_modular_build', return_value="Modular code") as mock_build: + # Mock is_complex to ensure it returns True for this test case + with patch.object(self.buddai, 'is_complex', return_value=True): + response = self.buddai._route_request(complex_msg, force_model=None, forge_mode="2") + + mock_build.assert_called() + self.assertEqual(response, "Modular code") + + def test_routing_search_query(self): + """Test that search queries route to repository search""" + search_msg = "Show me functions using applyForge" + + with patch.object(self.buddai, 'search_repositories', return_value="Search results") as mock_search: + # Mock is_search_query to ensure True + with patch.object(self.buddai, 'is_search_query', return_value=True): + # Ensure is_complex is False so it doesn't preempt search + with patch.object(self.buddai, 'is_complex', return_value=False): + response = self.buddai._route_request(search_msg, force_model=None, forge_mode="2") + + mock_search.assert_called_with(search_msg) + self.assertEqual(response, "Search results") + + def test_routing_forced_model(self): + """Test that force_model overrides other logic""" + with patch.object(self.buddai, 'call_model', return_value="Forced response") as mock_call: + response = self.buddai._route_request("Complex build request", force_model="balanced", forge_mode="2") + + mock_call.assert_called_with("balanced", "Complex build request") + self.assertEqual(response, "Forced response") + + def test_extract_modules(self): + """Verify module extraction logic""" + msg = "I need a robot with bluetooth and a flipper weapon" + modules = self.buddai.extract_modules(msg) + self.assertIn("ble", modules) + self.assertIn("servo", modules) + self.assertNotIn("motor", modules) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file