Fix try/finally for concurrency, remove duplicate maxConcurrent, add verbose config log

Co-Authored-By: Lars Baunwall <larslb@thinkability.dk>
This commit is contained in:
Devin AI 2025-08-12 17:42:50 +00:00
parent 4bc97d81ac
commit 89bb2c0659

View file

@ -49,6 +49,7 @@ async function startBridge() {
const token = (cfg.get<string>('token') ?? '').trim();
const hist = cfg.get<number>('historyWindow') ?? 3;
const verbose = cfg.get<boolean>('verbose') ?? false;
const maxConc = cfg.get<number>('maxConcurrent') ?? 1;
try {
try {
@ -63,6 +64,26 @@ async function startBridge() {
if (token && req.headers.authorization !== `Bearer ${token}`) {
writeJson(res, 401, { error: { message: 'unauthorized', type: 'invalid_request_error', code: 'unauthorized' } });
return;
if (req.method === 'POST' && req.url?.startsWith('/v1/chat/completions')) {
if (activeRequests >= maxConc) {
res.writeHead(429, { 'Content-Type': 'application/json', 'Retry-After': '1' });
res.end(JSON.stringify({ error: { message: 'too many requests', type: 'rate_limit_error', code: 'rate_limit_exceeded' } }));
if (verbose) output?.appendLine(`429 throttled (active=${activeRequests}, max=${maxConc})`);
return;
}
}
activeRequests++;
if (verbose) output?.appendLine(`Request started (active=${activeRequests})`);
}
if (req.method === 'POST' && req.url?.startsWith('/v1/chat/completions')) {
if (activeRequests >= maxConc) {
res.writeHead(429, { 'Content-Type': 'application/json', 'Retry-After': '1' });
res.end(JSON.stringify({ error: { message: 'too many requests', type: 'rate_limit_error', code: 'rate_limit_exceeded' } }));
if (verbose) output?.appendLine(`429 throttled (active=${activeRequests}, max=${maxConc})`);
return;
}
}
if (req.method === 'GET' && req.url === '/healthz') {
@ -81,65 +102,72 @@ async function startBridge() {
return;
}
const body = await readJson(req);
const messages = Array.isArray(body?.messages) ? body.messages : null;
if (!messages || messages.length === 0 || !messages.every((m: any) =>
m && typeof m.role === 'string' &&
/^(system|user|assistant)$/.test(m.role) &&
m.content !== undefined && m.content !== null
)) {
writeJson(res, 400, { error: { message: 'invalid request', type: 'invalid_request_error', code: 'invalid_payload' } });
return;
}
const prompt = normalizeMessages(messages, hist);
const streamMode = body?.stream !== false;
activeRequests++;
if (verbose) output?.appendLine(`Request started (active=${activeRequests})`);
try {
const body = await readJson(req);
const messages = Array.isArray(body?.messages) ? body.messages : null;
if (!messages || messages.length === 0 || !messages.every((m: any) =>
m && typeof m.role === 'string' &&
/^(system|user|assistant)$/.test(m.role) &&
m.content !== undefined && m.content !== null
)) {
writeJson(res, 400, { error: { message: 'invalid request', type: 'invalid_request_error', code: 'invalid_payload' } });
return;
}
const prompt = normalizeMessages(messages, hist);
const streamMode = body?.stream !== false;
const session = await access.startSession();
const chatStream = await session.sendRequest({ prompt, attachments: [] });
const session = await access.startSession();
const chatStream = await session.sendRequest({ prompt, attachments: [] });
if (streamMode) {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
const id = `cmp_${Math.random().toString(36).slice(2)}`;
if (verbose) output?.appendLine(`SSE start id=${id}`);
const h1 = chatStream.onDidProduceContent((chunk) => {
const payload = {
id,
object: 'chat.completion.chunk',
choices: [{ index: 0, delta: { content: chunk } }]
};
res.write(`data: ${JSON.stringify(payload)}\n\n`);
});
const endAll = () => {
if (verbose) output?.appendLine(`SSE end id=${id}`);
res.write('data: [DONE]\n\n');
res.end();
h1.dispose();
h2.dispose();
};
const h2 = chatStream.onDidEnd(endAll);
req.on('close', endAll);
return;
} else {
let buf = '';
const h1 = chatStream.onDidProduceContent((chunk) => { buf += chunk; });
await new Promise<void>((resolve) => {
const h2 = chatStream.onDidEnd(() => {
if (streamMode) {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
const id = `cmp_${Math.random().toString(36).slice(2)}`;
if (verbose) output?.appendLine(`SSE start id=${id}`);
const h1 = chatStream.onDidProduceContent((chunk) => {
const payload = {
id,
object: 'chat.completion.chunk',
choices: [{ index: 0, delta: { content: chunk } }]
};
res.write(`data: ${JSON.stringify(payload)}\n\n`);
});
const endAll = () => {
if (verbose) output?.appendLine(`SSE end id=${id}`);
res.write('data: [DONE]\n\n');
res.end();
h1.dispose();
h2.dispose();
resolve();
};
const h2 = chatStream.onDidEnd(endAll);
req.on('close', endAll);
return;
} else {
let buf = '';
const h1 = chatStream.onDidProduceContent((chunk) => { buf += chunk; });
await new Promise<void>((resolve) => {
const h2 = chatStream.onDidEnd(() => {
h1.dispose();
h2.dispose();
resolve();
});
});
});
if (verbose) output?.appendLine(`Non-stream complete len=${buf.length}`);
writeJson(res, 200, {
id: `cmpl_${Math.random().toString(36).slice(2)}`,
object: 'chat.completion',
choices: [{ index: 0, message: { role: 'assistant', content: buf }, finish_reason: 'stop' }]
});
return;
if (verbose) output?.appendLine(`Non-stream complete len=${buf.length}`);
writeJson(res, 200, {
id: `cmpl_${Math.random().toString(36).slice(2)}`,
object: 'chat.completion',
choices: [{ index: 0, message: { role: 'assistant', content: buf }, finish_reason: 'stop' }]
});
return;
}
} finally {
activeRequests--;
if (verbose) output?.appendLine(`Request complete (active=${activeRequests})`);
}
}
@ -159,6 +187,10 @@ async function startBridge() {
const shown = addr ? `${addr.address}:${addr.port}` : `${host}:${portCfg}`;
statusItem!.text = `Copilot Bridge: ${access ? 'OK' : 'Unavailable'} @ ${shown}`;
output?.appendLine(`Started at http://${shown} | Copilot: ${access ? 'ok' : 'unavailable'}`);
if (verbose) {
const tokenSet = token ? 'set' : 'unset';
output?.appendLine(`Config: host=${host} port=${addr?.port ?? portCfg} hist=${hist} maxConcurrent=${maxConc} token=${tokenSet}`);
}
} catch (e: any) {
running = false;
output?.appendLine(`Failed to start: ${e?.stack || e?.message || String(e)}`);