diff --git a/src/index.test.ts b/src/index.test.ts index 80332ca..62defac 100644 --- a/src/index.test.ts +++ b/src/index.test.ts @@ -473,14 +473,35 @@ describe("Perplexity MCP Server", () => { const models = ["sonar-pro", "sonar-deep-research", "sonar-reasoning-pro"]; for (const model of models) { - const mockResponse = { - choices: [{ message: { content: `Response from ${model}` } }], - }; - - global.fetch = vi.fn().mockResolvedValue({ - ok: true, - json: async () => mockResponse, - } as Response); + if (model === "sonar-deep-research") { + // sonar-deep-research uses streaming, so provide an SSE mock + const sseData = [ + `data: ${JSON.stringify({ choices: [{ delta: { content: "Response " } }] })}\n\n`, + `data: ${JSON.stringify({ choices: [{ delta: { content: `from ${model}` } }] })}\n\n`, + `data: [DONE]\n\n`, + ].join(""); + + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode(sseData)); + controller.close(); + }, + }); + + global.fetch = vi.fn().mockResolvedValue({ + ok: true, + body: stream, + } as unknown as Response); + } else { + const mockResponse = { + choices: [{ message: { content: `Response from ${model}` } }], + }; + + global.fetch = vi.fn().mockResolvedValue({ + ok: true, + json: async () => mockResponse, + } as Response); + } const messages = [{ role: "user", content: "test" }]; const result = await performChatCompletion(messages, model); diff --git a/src/server.ts b/src/server.ts index f280959..fc7aa95 100644 --- a/src/server.ts +++ b/src/server.ts @@ -60,6 +60,77 @@ export function stripThinkingTokens(content: string): string { return content.replace(/[\s\S]*?<\/think>/g, '').trim(); } +export async function consumeSSEStream(response: Response): Promise { + const body = response.body; + if (!body) { + throw new Error("Response body is null"); + } + + const reader = (body as ReadableStream).getReader(); + const decoder = new TextDecoder(); + + let contentParts: string[] = []; + let citations: string[] | undefined; + let usage: ChatCompletionResponse["usage"] | undefined; + let id: string | undefined; + let model: string | undefined; + let created: number | undefined; + let buffer = ""; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + + const lines = buffer.split("\n"); + // Keep the last potentially incomplete line in the buffer + buffer = lines.pop() || ""; + + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed || !trimmed.startsWith("data:")) continue; + + const data = trimmed.slice("data:".length).trim(); + if (data === "[DONE]") continue; + + try { + const parsed = JSON.parse(data); + + if (parsed.id) id = parsed.id; + if (parsed.model) model = parsed.model; + if (parsed.created) created = parsed.created; + if (parsed.citations) citations = parsed.citations; + if (parsed.usage) usage = parsed.usage; + + const delta = parsed.choices?.[0]?.delta; + if (delta?.content) { + contentParts.push(delta.content); + } + } catch { + // Skip malformed JSON chunks (e.g. keep-alive pings) + } + } + } + + const assembled: ChatCompletionResponse = { + choices: [ + { + message: { content: contentParts.join("") }, + finish_reason: "stop", + index: 0, + }, + ], + ...(citations && { citations }), + ...(usage && { usage }), + ...(id && { id }), + ...(model && { model }), + ...(created && { created }), + }; + + return ChatCompletionResponseSchema.parse(assembled); +} + export async function performChatCompletion( messages: Message[], model: string = "sonar-pro", @@ -74,10 +145,13 @@ export async function performChatCompletion( // Read timeout fresh each time to respect env var changes const TIMEOUT_MS = parseInt(process.env.PERPLEXITY_TIMEOUT_MS || "300000", 10); + const useStreaming = model === "sonar-deep-research"; + const url = new URL(`${PERPLEXITY_BASE_URL}/chat/completions`); const body: Record = { model: model, messages: messages, + ...(useStreaming && { stream: true }), ...(options?.search_recency_filter && { search_recency_filter: options.search_recency_filter }), ...(options?.search_domain_filter && { search_domain_filter: options.search_domain_filter }), ...(options?.search_context_size && { web_search_options: { search_context_size: options.search_context_size } }), @@ -125,8 +199,12 @@ export async function performChatCompletion( let data: ChatCompletionResponse; try { - const json = await response.json(); - data = ChatCompletionResponseSchema.parse(json); + if (useStreaming) { + data = await consumeSSEStream(response); + } else { + const json = await response.json(); + data = ChatCompletionResponseSchema.parse(json); + } } catch (error) { if (error instanceof z.ZodError) { const issues = error.issues;