Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -473,14 +473,35 @@ describe("Perplexity MCP Server", () => {
const models = ["sonar-pro", "sonar-deep-research", "sonar-reasoning-pro"];

for (const model of models) {
const mockResponse = {
choices: [{ message: { content: `Response from ${model}` } }],
};

global.fetch = vi.fn().mockResolvedValue({
ok: true,
json: async () => mockResponse,
} as Response);
if (model === "sonar-deep-research") {
// sonar-deep-research uses streaming, so provide an SSE mock
const sseData = [
`data: ${JSON.stringify({ choices: [{ delta: { content: "Response " } }] })}\n\n`,
`data: ${JSON.stringify({ choices: [{ delta: { content: `from ${model}` } }] })}\n\n`,
`data: [DONE]\n\n`,
].join("");

const stream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode(sseData));
controller.close();
},
});

global.fetch = vi.fn().mockResolvedValue({
ok: true,
body: stream,
} as unknown as Response);
} else {
const mockResponse = {
choices: [{ message: { content: `Response from ${model}` } }],
};

global.fetch = vi.fn().mockResolvedValue({
ok: true,
json: async () => mockResponse,
} as Response);
}

const messages = [{ role: "user", content: "test" }];
const result = await performChatCompletion(messages, model);
Expand Down
82 changes: 80 additions & 2 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,77 @@ export function stripThinkingTokens(content: string): string {
return content.replace(/<think>[\s\S]*?<\/think>/g, '').trim();
}

export async function consumeSSEStream(response: Response): Promise<ChatCompletionResponse> {
const body = response.body;
if (!body) {
throw new Error("Response body is null");
}

const reader = (body as ReadableStream<Uint8Array>).getReader();
const decoder = new TextDecoder();

let contentParts: string[] = [];
let citations: string[] | undefined;
let usage: ChatCompletionResponse["usage"] | undefined;
let id: string | undefined;
let model: string | undefined;
let created: number | undefined;
let buffer = "";

while (true) {
const { done, value } = await reader.read();
if (done) break;

buffer += decoder.decode(value, { stream: true });

const lines = buffer.split("\n");
// Keep the last potentially incomplete line in the buffer
buffer = lines.pop() || "";

for (const line of lines) {
const trimmed = line.trim();
if (!trimmed || !trimmed.startsWith("data:")) continue;

const data = trimmed.slice("data:".length).trim();
if (data === "[DONE]") continue;

try {
const parsed = JSON.parse(data);

if (parsed.id) id = parsed.id;
if (parsed.model) model = parsed.model;
if (parsed.created) created = parsed.created;
if (parsed.citations) citations = parsed.citations;
if (parsed.usage) usage = parsed.usage;

const delta = parsed.choices?.[0]?.delta;
if (delta?.content) {
contentParts.push(delta.content);
}
} catch {
// Skip malformed JSON chunks (e.g. keep-alive pings)
}
}
}

const assembled: ChatCompletionResponse = {
choices: [
{
message: { content: contentParts.join("") },
finish_reason: "stop",
index: 0,
},
],
...(citations && { citations }),
...(usage && { usage }),
...(id && { id }),
...(model && { model }),
...(created && { created }),
};

return ChatCompletionResponseSchema.parse(assembled);
}

export async function performChatCompletion(
messages: Message[],
model: string = "sonar-pro",
Expand All @@ -74,10 +145,13 @@ export async function performChatCompletion(
// Read timeout fresh each time to respect env var changes
const TIMEOUT_MS = parseInt(process.env.PERPLEXITY_TIMEOUT_MS || "300000", 10);

const useStreaming = model === "sonar-deep-research";

const url = new URL(`${PERPLEXITY_BASE_URL}/chat/completions`);
const body: Record<string, unknown> = {
model: model,
messages: messages,
...(useStreaming && { stream: true }),
...(options?.search_recency_filter && { search_recency_filter: options.search_recency_filter }),
...(options?.search_domain_filter && { search_domain_filter: options.search_domain_filter }),
...(options?.search_context_size && { web_search_options: { search_context_size: options.search_context_size } }),
Expand Down Expand Up @@ -125,8 +199,12 @@ export async function performChatCompletion(

let data: ChatCompletionResponse;
try {
const json = await response.json();
data = ChatCompletionResponseSchema.parse(json);
if (useStreaming) {
data = await consumeSSEStream(response);
} else {
const json = await response.json();
data = ChatCompletionResponseSchema.parse(json);
}
} catch (error) {
if (error instanceof z.ZodError) {
const issues = error.issues;
Expand Down