Skip to content

tool call streaming implementation#10

Open
shivamhwp wants to merge 40 commits intomainfrom
feat/tool-call-streaming
Open

tool call streaming implementation#10
shivamhwp wants to merge 40 commits intomainfrom
feat/tool-call-streaming

Conversation

@shivamhwp
Copy link
Copy Markdown
Member

@shivamhwp shivamhwp commented Aug 4, 2025

  • on_tool_start_stream events to the buffer.
  • use-stream.ts merges partial chunks and marks them complete.
  • minor refactor of the prompt, was giving the output parsing failure

Summary by CodeRabbit

  • New Features
    • Live tool output streaming with progress and finalization, including improved extraction and checkpoint refresh.
    • New StreamingOutput UI for terminal-like live updates.
    • Google Calendar/Gmail tools now stream progress and results.
    • Enhanced planner/replanner prompts for stricter, schema-based outputs.
    • Safer, richer document previews with error handling and secure URL opening.
    • TooltipButton supports a disabled state.
  • Bug Fixes
    • More robust AI message parsing to prevent rendering errors.
    • Improved edit submission validation and error feedback.
  • Refactor
    • Consolidated tool utilities and grouped MCP tools for clearer organization.
  • Style
    • Minor UI tweaks (icons, colors, font sizing).
  • Chores
    • Updated TypeScript; removed an obsolete dev script.

- Standardized code formatting across multiple files, ensuring consistent style and readability.
- Enhanced the SearchResultDisplay component with improved styling and layout for better user experience.
- Updated the useStream hook to handle tool chunk processing more effectively, allowing for smoother data handling.
- Minor adjustments made to prompt creation functions for clarity and consistency in output formatting.
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Aug 4, 2025

Walkthrough

Implements custom-event-driven tool streaming and lazy checkpoint refresh in LangChain integration, overhauls planner/replanner prompts, refactors Google tools to a tool(...) wrapper with progress events, restructures MCP tool return to include grouping, adds a StreamingOutput UI, introduces tool formatting utils, and applies assorted UI/UX, hooks, and minor formatting updates.

Changes

Cohort / File(s) Summary
Streaming & checkpointing core
convex/langchain/index.ts
Adds buffered ToolChunkGroup streaming, lazy checkpoint refresh during flush, completedSteps extraction, custom event handling for tool_stream/tool_progress, unified tool output buffering, and final flush with latest checkpoint.
Prompt overhaul
convex/langchain/prompts.ts
Replaces planner/replanner prompts with SystemMessage-based, schema-driven JSON outputs; extends schemas for respond_to_user; adjusts tool headers and guidance text.
Tooling refactors & progress events
convex/langchain/tools/googleTools.ts, convex/langchain/tools/retrievalTools.ts, convex/langchain/tools/mcpTools.ts
Google tools migrated to tool(...) API with dispatchCustomEvent progress/error streaming; retrieval vectorSearch emits tool_progress events; MCP tools now return { tools, groupedTools } and centralize file upload to MCP machines.
Agent, helpers, state
convex/langchain/agent.ts, convex/langchain/helpers.ts, convex/langchain/state.ts
Replanner normalizes outputs and preserves pastSteps; getAvailableTools uses settled promises and grouped toolkits; file-id extraction hardened; minor signature formatting; state string trailing commas removed.
Chat message components
src/components/chat/messages/ai-message/ai-message.tsx, .../tool-message/index.tsx, .../tool-message/search-results.tsx, .../reasoning.tsx, .../streaming-message.tsx, src/components/chat/messages/index.tsx
Safer AI content parsing; formatting-only in tool-message; lucide ChevronDown swap; reasoning state init tweak; stable React keys for streaming; minor header color change.
UI components
src/components/ui/streaming-output.tsx, src/components/ui/tool-accoordion.tsx, src/components/ui/accordion.tsx, src/components/ui/tooltip-button.tsx
New StreamingOutput component for terminal-like streaming; ToolAccordion prop signature drops input (but JSX still references it); AccordionTrigger param order; TooltipButton adds disabled prop with accessible behavior.
Hooks
src/hooks/chats/use-stream.ts, src/hooks/chats/use-messages.ts
use-stream formatting only; use-messages splits effects for thread vs streaming state updates; no API changes.
Utilities
src/lib/tool-utils.ts
New helpers: cleanToolName, formatToolInput/Output, getToolStatusText/Color for tool display/status formatting.
Document dialog
src/components/document-dialog.tsx
Safer URL handling, explicit preview error state, query-driven data fetch, robust download/open logic, accessibility improvements.
User utils bar
src/components/chat/messages/utils-bar/user-utils-bar.tsx
Input validation with toasts, refined mutation payloads, attachment extraction, keyboard submit, improved error handling.
Utils bar index
src/components/chat/messages/utils-bar/index.ts
Removes non-null assertions on chatId; formatting.
Convex chat messages
convex/chatMessages/helpers.ts
Formatting-only changes; logic unchanged.
Settings route
src/routes/settings/route.lazy.tsx
Removes text-sm from nav link class.
Package & scripts
package.json
Removes odev script; bumps TypeScript to ^5.9.0-beta.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant UI as Frontend UI
  participant Stream as useStream Hook
  participant Convex as convex/langchain/index.ts
  participant LG as LangGraph Runtime
  participant Tool as Tool (google/retrieval/MCP)
  participant CKPT as Checkpoint Store

  UI->>Stream: Start streaming
  Stream->>Convex: Request stream
  Convex->>LG: Run agent/graph
  LG-->>Convex: AI tokens/chunks
  Convex->>Convex: Buffer AI chunks

  rect rgba(220,240,255,0.4)
    Tool-->>Convex: dispatchCustomEvent(tool_progress/tool_stream)
    Convex->>Convex: appendToolChunk(buffer, {chunk,isComplete?})
  end

  Tool-->>Convex: on_tool_end/on_tool_error (final)
  Convex->>Convex: mark pendingCheckpointRefresh = true

  alt Flush cycle
    Convex->>CKPT: If pending, fetch checkpoint
    CKPT-->>Convex: Checkpoint
    Convex->>Stream: Emit buffered chunks (+completedSteps)
  end

  LG-->>Convex: Stream finished
  Convex->>CKPT: Fetch latest checkpoint
  CKPT-->>Convex: Checkpoint
  Convex->>Stream: Final flush (chunks + completedSteps)
  Stream-->>UI: Render AI, tool outputs (StreamingOutput/ToolAccordion)
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

  • ui overhaul  #12 — Also modifies ToolAccordion, including prop signature changes around input handling.
  • update: artifacts + projects. #6 — Modifies planner/replanner prompts in the same module, overlapping with prompt construction changes here.

Suggested labels

imp and noice

Suggested reviewers

  • mantrakp04

Poem

I buffered a burrow of bytes today,
Little tool-prints hopping the streaming way.
Checkpoints nuzzle, plans replanned tight—
Cursor blinks softly in terminal light.
With whiskered wisdom I ship this feat,
Carrots committed, reviews to eat. 🥕✨

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/tool-call-streaming

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore or @coderabbit ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

- Updated subproject commit reference for mcps.
- Introduced StreamingOutput component for better handling of streaming messages.
- Improved ToolMessage component to utilize StreamingOutput for displaying message content and status.
- Enhanced SearchResultDisplay styling and adjusted ToolAccordion for better user experience.
- Added utility functions for tool display and formatting in tool-utils.ts.
@cloudflare-workers-and-pages
Copy link
Copy Markdown

cloudflare-workers-and-pages bot commented Aug 5, 2025

Deploying zerobsv2 with  Cloudflare Pages  Cloudflare Pages

Latest commit: 36067b8
Status:🚫  Build failed.

View logs

coderabbitai[bot]

This comment was marked as resolved.

…ed state handling

- Introduced a MessageAdditionalKwargs interface in ToolMessage for better type safety.
- Updated StreamingOutput to handle content display based on isComplete state more effectively.
- Refactored ToolAccordion to utilize a new getContentClassName function for cleaner styling logic based on isComplete status.
- Simplified tool name mapping in tool-utils.ts for better maintainability.
…unctionality

- Replaced string-based system messages with structured SystemMessage objects in createPlannerPrompt and createReplannerPrompt functions for better type safety and readability.
- Updated toolbar icon for "Artifacts" from FileIcon to FileCode2 for enhanced visual representation.
- Revised todo list in temp.md to reflect current priorities and improvements, including UX enhancements and integration updates.
coderabbitai[bot]

This comment was marked as resolved.

@shivamhwp shivamhwp changed the title minor update on tool call streaming. tool call streaming implementation Aug 13, 2025
- Updated Google tools to include detailed streaming events and improved error handling for API requests.
- Refactored MCP tools to wrap tool invocations with descriptive streaming events for better user feedback.
- Enhanced retrieval tools with improved document search capabilities and error messaging.
- Improved UI components for displaying search results and streaming outputs, including collapsible sections and better formatting.
- Added date formatting for search results and refined tool accordion display for clarity.
coderabbitai[bot]

This comment was marked as outdated.

coderabbitai[bot]

This comment was marked as outdated.

- Introduced a new feedback settings route for user communication.
- Created a dedicated templates file for MCPs, consolidating template definitions.
- Updated various components to utilize the new MCP templates structure.
- Removed unused dropdown menu code from the TopNav component for cleaner UI.
coderabbitai[bot]

This comment was marked as outdated.

- Removed unnecessary trailing commas and adjusted function signatures for consistency.
- Enhanced the getAvailableTools function to return structured tool data more clearly.
- Updated the StreamingOutput component to simplify state toggling logic.
- Improved the ToolAccordion component's loader display for better visual clarity.
coderabbitai[bot]

This comment was marked as outdated.

- Updated the tool handling logic to utilize a new helper function for appending tool chunk data, enhancing code readability.
- Refactored the extraction of completed steps from checkpoints into a dedicated function for better organization and clarity.
- Adjusted various tool stream events to use the new helper function for consistency in data formatting.
- Fixed minor formatting issues in retrieval tool messages for improved user feedback.
- Removed unnecessary divider from the ToolBar component for a cleaner layout.
- Enhanced code readability in ToolkitToggles by formatting and restructuring logic.
- Updated text color in ChatMessages for better visual consistency.
- Corrected Discord link in feedback settings to point to the appropriate channel.
coderabbitai[bot]

This comment was marked as outdated.

coderabbitai[bot]

This comment was marked as outdated.

- Streamlined custom event handling in chat actions for better data validation and error handling.
- Updated ChatMessages component styling for improved layout consistency.
- Refactored feedback settings to use constants for URLs, enhancing maintainability and readability.
coderabbitai[bot]

This comment was marked as outdated.

…at actions

- Introduced a mechanism to defer checkpoint refresh until necessary, enhancing performance during chat processing.
- Updated comments for better clarity on checkpoint management and event handling.
- Cleaned up formatting in state validation schemas for improved readability.
…ent functions

- Cleaned up function signatures by removing unnecessary trailing commas for consistency.
- Enhanced response handling in the replanner function to normalize various output shapes into a single string.
- Updated prompt templates to include explicit schema definitions, reducing model output errors.
- Improved AI message component to ensure content is a string before parsing, enhancing robustness.
coderabbitai[bot]

This comment was marked as outdated.

…documentation

- Added error handling in the replanner function to manage structured output parsing failures and provide fallback responses.
- Updated prompt schema documentation to clarify requirements for "single" and "parallel" plan items, ensuring correct data structures.
- Improved AI message component to ensure content is processed correctly, enhancing overall robustness.
coderabbitai[bot]

This comment was marked as outdated.

- Removed the `complete` flag from `tool_progress` events in Google and retrieval tools to simplify error handling.
- Updated error dispatch logic to maintain consistency across different tools.
- Cleaned up comments and improved code readability.
coderabbitai[bot]

This comment was marked as outdated.

coderabbitai[bot]

This comment was marked as resolved.

- Reformatted import statements for improved consistency and readability.
- Added error handling for preview loading in DocumentDialog, providing user feedback on failures.
- Updated the handling of document previews and download links to ensure better user experience.
- Streamlined key event handling in TopNav for toggling the resizable panel, improving accessibility.
- Enhanced user session management by refining user data fetching logic in TopNav.
coderabbitai[bot]

This comment was marked as resolved.

- Removed unnecessary non-null assertions for document IDs in DocumentDialog to enhance code safety.
- Updated icon imports in ModelPopover and SearchResults to maintain consistency with the new icon set.
- Refactored SearchResultDisplay to improve layout and accessibility, including changes to button elements for better interaction.
coderabbitai[bot]

This comment was marked as outdated.

coderabbitai[bot]

This comment was marked as resolved.

- Updated stats.html to reflect new asset names for improved clarity.
- Enhanced DocumentDialog to validate URLs before setting the preview, providing user feedback for invalid URLs.
- Improved ModelPopover's drag-and-drop functionality by adding checks for valid drop targets and ensuring safe index updates.
- Streamlined UserUtilsBar component for better readability and consistency in file upload handling and message submission logic.
coderabbitai[bot]

This comment was marked as resolved.

2. no submit, if no edited text.
3. better tooltip-button.tsx
coderabbitai[bot]

This comment was marked as outdated.

coderabbitai[bot]

This comment was marked as resolved.

coderabbitai[bot]

This comment was marked as outdated.

coderabbitai[bot]

This comment was marked as outdated.

…ieval tools and chat components

- Standardized formatting in `index.ts` for better readability.
- Enhanced error handling in `getRetrievalTools` to dispatch appropriate events on search failure.
- Removed unnecessary click handler in `model-popover.tsx`.
- Improved output handling in `use-stream.ts` to ensure proper stringification of tool outputs.
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
convex/langchain/index.ts (4)

80-116: Convex guideline: add returns validator to internalAction.

All Convex functions must declare returns.

 export const generateTitle = internalAction({
   args: v.object({
     chat: Chats.doc,
     message: ChatMessages.doc,
   }),
+  returns: v.null(),

118-147: Convex guideline: add returns validator to action chat.

 export const chat = action({
   args: v.object({
     chatId: v.id("chats"),
     model: v.optional(v.string()),
   }),
+  returns: v.null(),

475-495: Convex guideline: add returns validator to regenerate action.

 export const regenerate = action({
   args: v.object({
     messageId: v.id("chatMessages"),
   }),
+  returns: v.null(),

497-616: Convex guideline: declare explicit returns schema for branchChat.

It returns { newChatId }; add matching returns.

 export const branchChat = action({
   args: v.object({
     chatId: v.id("chats"),
     branchFrom: v.id("chatMessages"),
     model: v.optional(v.string()),
     editedContent: v.optional(v.object({
       text: v.optional(v.string()),
       documents: v.optional(v.array(v.id("documents"))),
     })),
   }),
+  returns: v.object({ newChatId: v.id("chats") }),
♻️ Duplicate comments (19)
convex/langchain/tools/retrievalTools.ts (14)

223-229: Normalize optional topic to undefined, not null.

Some clients reject null; omit field when not provided.

-            topic: topic,
+            topic: topic ?? undefined,

41-50: Bind custom events to the correct tool run (add toolConfig and forward it).

Without passing toolConfig, UI cannot associate chunks with the current tool call.

-    func: async ({
-      queries,
-      limit = 10,
-    }: {
-      queries: string[];
-      limit?: number;
-    }) => {
-      await dispatchCustomEvent("tool_progress", {
-        chunk: "Initializing vector store...",
-      });
+    func: async (
+      { queries, limit = 10 }: { queries: string[]; limit?: number },
+      toolConfig?: any
+    ) => {
+      await dispatchCustomEvent(
+        "tool_progress",
+        { chunk: "Initializing vector store..." },
+        toolConfig
+      );

Repeat: pass toolConfig to every dispatchCustomEvent in this tool.


61-67: Guard against missing projectId to avoid runtime throw.

Accessing config.chat.projectId! can throw when called outside a project context.

+      if (!config.chat.projectId) {
+        const msg = "No project selected. Vector search is only available within a project.";
+        await dispatchCustomEvent("tool_stream", { chunk: msg, complete: true }, toolConfig);
+        return msg;
+      }
       const includedProjectDocuments = await config.ctx.runQuery(
         internal.projectDocuments.queries.getSelected,
         {
           projectId: config.chat.projectId!,
           selected: true,
         }
       );

69-73: Terminal event should mark completion and use tool_stream.

Use complete: true so the UI stops showing in-progress.

-      if (includedProjectDocuments.length === 0) {
-        const msg = "No project documents available for retrieval.";
-        await dispatchCustomEvent("tool_progress", { chunk: msg });
-        return msg;
-      }
+      if (includedProjectDocuments.length === 0) {
+        const msg = "No project documents available for retrieval.";
+        await dispatchCustomEvent("tool_stream", { chunk: msg, complete: true }, toolConfig);
+        return msg;
+      }

85-95: Vector filter compares entire metadata object; match metadata.source instead.

Current q.eq("metadata", {...}) won’t match as intended; compare the nested field.

-          filter: (q) =>
-            q.or(
-              // Assuming documentId is stored in the `source` field of metadata
-              ...includedProjectDocuments.map((document) =>
-                q.eq("metadata", {
-                  source: document.documentId,
-                }),
-              ),
-            ),
+          filter: (q) =>
+            q.or(
+              ...includedProjectDocuments.map((d) =>
+                q.eq(q.field("metadata", "source"), d.documentId)
+              ),
+            ),

If q.field syntax differs in your version, use the store’s documented nested-field predicate.


122-143: Filter nulls from Promise.all to ensure Document[].

Nulls propagate and break consumers expecting Document[].

-      const documents = await Promise.all(
+      const documents = (await Promise.all(
         allResults.map(async (doc) => {
           const projectDocument = documentsMap.get(
             (doc.metadata as any).source,
           );
           if (!projectDocument) {
             return null;
           }
           const url = await getDocumentUrl(config.ctx, projectDocument.key);

           return new Document({
             pageContent: doc.pageContent,
             metadata: {
               document: projectDocument,
               source: url,
               type: "document",
               query: doc.metadata.query,
             },
           });
         }),
-      );
+      )).filter(Boolean) as Document[];

144-148: Signal terminal completion before returning results.

Mark the stream complete to unblock the UI.

-      await dispatchCustomEvent("tool_progress", {
-        chunk: "Formatting final output...",
-      });
+      await dispatchCustomEvent(
+        "tool_stream",
+        { chunk: "Formatting final output…", complete: true },
+        toolConfig
+      );
       return returnString ? JSON.stringify(documents, null, 0) : documents;

195-201: Also accept toolConfig in webSearch and propagate it to all events.

Required for correct binding to the tool run.

-    func: async ({
-      queries,
-      topic,
-    }: {
-      queries: string[];
-      topic?: string | null;
-    }) => {
+    func: async (
+      { queries, topic }: { queries: string[]; topic?: string | null },
+      toolConfig?: any
+    ) => {

Propagate toolConfig to all dispatchCustomEvent calls in this function.


202-218: Forward toolConfig to dispatchCustomEvent.

Events currently lack run_id/name.

-      await dispatchCustomEvent("tool_progress", {
-        chunk: "Preparing web search...",
-      });
+      await dispatchCustomEvent(
+        "tool_progress",
+        { chunk: "Preparing web search..." },
+        toolConfig
+      );-      await dispatchCustomEvent("tool_progress", {
-        chunk: "Searching the Internet...",
-      });
+      await dispatchCustomEvent(
+        "tool_progress",
+        { chunk: "Searching the Internet..." },
+        toolConfig
+      );

Repeat for every dispatchCustomEvent below.


232-236: Don’t return early on a single empty query; continue aggregating.

Early return discards prior/remaining results.

-          const msg = `No results found for query: ${query}`;
-          await dispatchCustomEvent("tool_progress", { chunk: msg });
-          return msg;
+          const msg = `No results found for query: ${query}`;
+          await dispatchCustomEvent("tool_progress", { chunk: msg }, toolConfig);
+          continue;

262-269: Use complete: true (not done) and pass toolConfig.

on_custom_event only recognizes payload.complete.

-      await dispatchCustomEvent("tool_stream", {
-        chunk: "No results found.",
-        done: true,
-        error: false,
-      });
+      await dispatchCustomEvent(
+        "tool_stream",
+        { chunk: "No results found.", complete: true },
+        toolConfig
+      );

274-280: Mark success as complete and include toolConfig.

Aligns with the tool streaming protocol.

-      await dispatchCustomEvent("tool_stream", {
-        chunk: "Search completed successfully.",
-        done: true,
-        error: false,
-      });
+      await dispatchCustomEvent(
+        "tool_stream",
+        { chunk: `Found ${documents.length} results. Formatting…`, complete: true },
+        toolConfig
+      );

281-294: Mark failure as complete (use complete, not done) and forward toolConfig.

Prevents “stuck” progress on errors.

-      await dispatchCustomEvent("tool_stream", {
-        chunk: errorMessage,
-        done: true,
-        error: true,
-      });
+      await dispatchCustomEvent(
+        "tool_stream",
+        { chunk: errorMessage, complete: true },
+        toolConfig
+      );

214-230: Implement Exa → DuckDuckGo fallback when EXA_API_KEY is absent.

PR description promises DDG fallback; code always uses Exa.

Minimal branching (pseudo-diff; add imports as needed):

-      const exa = new Exa(EXA_API_KEY, undefined);
-      let allDocuments = [];
+      let allDocuments: Document[] = [];
+      const useExa = Boolean(EXA_API_KEY);
+      const exa = useExa ? new Exa(EXA_API_KEY, undefined) : null;-      const searchResponse = (
-        await exa.searchAndContents(query, {
-          numResults: Math.ceil(10 / queries.length),
-          type: "auto",
-          useAutoprompt: false,
-          topic: topic ?? undefined,
-          text: true,
-        })
-      ).results;
+      const searchResponse = useExa
+        ? (
+            await exa!.searchAndContents(query, {
+              numResults: Math.ceil(10 / queries.length),
+              type: "auto",
+              useAutoprompt: false,
+              topic: topic ?? undefined,
+              text: true,
+            })
+          ).results
+        : await ddgSearch(query, Math.ceil(10 / queries.length)); // map to { title,url,text,... }

I can wire up ddgSearch for you (duck-duck-scrape or similar) if desired.

src/hooks/chats/use-stream.ts (2)

185-189: Avoid serializing null/undefined to '""' for completed ToolMessage content.

Normalize content to empty string when output is null/undefined.

-            content:
-              typeof chunk.output === "string"
-                ? chunk.output
-                : JSON.stringify(chunk.output ?? ""),
+            content:
+              typeof chunk.output === "string"
+                ? chunk.output
+                : chunk.output !== undefined && chunk.output !== null
+                  ? JSON.stringify(chunk.output)
+                  : "",

201-205: Same fix for in-flight ToolMessage content normalization.

-            content:
-              typeof chunk.output === "string"
-                ? chunk.output
-                : JSON.stringify(chunk.output ?? ""),
+            content:
+              typeof chunk.output === "string"
+                ? chunk.output
+                : chunk.output !== undefined && chunk.output !== null
+                  ? JSON.stringify(chunk.output)
+                  : "",
convex/langchain/index.ts (2)

46-78: Don’t conflate completed and planned steps; return plan-only (active) steps.

stream.completedSteps is used as “active steps” downstream; mixing pastSteps mislabels UI.

-function extractCompletedStepsFromCheckpoint(
+function extractCompletedStepsFromCheckpoint(
   checkpoint: typeof GraphState.State | null | undefined
 ): string[] {
-  const completedSteps: string[] = [];
-
-  const pastSteps = (checkpoint as any)?.pastSteps as
-    | Array<[string, unknown[]]>
-    | undefined;
-  if (pastSteps && pastSteps.length > 0) {
-    completedSteps.push(...pastSteps.map((ps) => ps[0]));
-  }
-
-  const plan = (checkpoint as any)?.plan as
+  const plan = (checkpoint as any)?.plan as
     | Array<
         | {
             type: "parallel";
             data: Array<{ step: string; context: string }>;
           }
         | { type: "single"; data: { step: string; context: string } }
       >
     | undefined;
-  if (plan && plan.length > 0) {
-    const first = plan[0];
-    if (first.type === "parallel") {
-      completedSteps.push(...first.data.map((s) => s.step));
-    } else {
-      completedSteps.push(first.data.step);
-    }
-  }
-
-  return completedSteps;
+  if (!plan || plan.length === 0) return [];
+  const steps: string[] = [];
+  for (const item of plan) {
+    if (item.type === "parallel") {
+      steps.push(...item.data.map((s) => s.step));
+    } else {
+      steps.push(item.data.step);
+    }
+  }
+  return steps;
}

293-360: Don’t exit the streamer on benign/malformed custom events; use continue.

return here stops processing subsequent events.

-                  return;
+                  continue;-                  return;
+                  continue;-                    return;
+                    continue;

Also, accept done as an alias for complete:

-                const isComplete =
-                  eventName === "tool_progress"
-                    ? false
-                    : payload.complete === true;
+                const isComplete =
+                  eventName === "tool_progress"
+                    ? false
+                    : payload.complete === true || payload.done === true;
src/components/chat/input/toolbar/model-popover.tsx (1)

195-214: Fix drop-outside crash/mis-ordering (guard when over is null).

Dropping outside a droppable yields over = null; current code computes newIndex = -1 and may reorder incorrectly.

Apply:

-    if (active.id !== over?.id) {
-      const oldIndex = orderedModels.findIndex(
-        (model) => model.model_name === active.id,
-      );
-      const newIndex = orderedModels.findIndex(
-        (model) => model.model_name === over?.id,
-      );
-
-      const newOrder = arrayMove(
-        orderedModels.map((m) => m.model_name),
-        oldIndex,
-        newIndex,
-      );
-
-      onReorderModels(newOrder);
-    }
+    if (!over) return;
+    if (active.id === over.id) return;
+
+    const oldIndex = orderedModels.findIndex(
+      (m) => m.model_name === active.id,
+    );
+    const newIndex = orderedModels.findIndex(
+      (m) => m.model_name === over.id,
+    );
+    if (oldIndex < 0 || newIndex < 0) return;
+
+    const newOrder = arrayMove(
+      orderedModels.map((m) => m.model_name),
+      oldIndex,
+      newIndex,
+    );
+    onReorderModels(newOrder);
🧹 Nitpick comments (9)
convex/langchain/tools/retrievalTools.ts (1)

238-240: Optional: add a terminal “Formatting…” stream before final return.

Helps UI convey completion of processing.

-          await dispatchCustomEvent("tool_progress", {
-            chunk: `Found ${searchResponse.length} results.`,
-          });
+          await dispatchCustomEvent(
+            "tool_progress",
+            { chunk: `Found ${searchResponse.length} results.` },
+            toolConfig
+          );

And just before returning final docs (see lines 274–280), emit a “tool_stream” complete event.

src/hooks/chats/use-stream.ts (1)

61-65: Harden JSON.parse for chunk payloads.

A malformed chunk will throw and stall polling; add try/catch per item.

Proposed pattern (apply within the map):

-  (chunkStr: string) => JSON.parse(chunkStr) as ChunkGroup,
+  (chunkStr: string) => {
+    try { return JSON.parse(chunkStr) as ChunkGroup; }
+    catch { return undefined as any; }
+  },

Then filter falsy entries before merging. If you want, I can submit a targeted patch.

convex/langchain/index.ts (1)

174-187: Reuse extractor for completed steps in flush; avoid duplicate logic.

-                completedSteps: [
-                  ...(localCheckpoint?.pastSteps?.map(
-                    (pastStep) => pastStep[0]
-                  ) ?? []),
-                  ...(localCheckpoint?.plan && localCheckpoint.plan.length > 0
-                    ? [
-                        ...(localCheckpoint.plan[0].type === "parallel"
-                          ? localCheckpoint.plan[0].data.map(
-                              (step) => step.step
-                            )
-                          : [localCheckpoint.plan[0].data.step]),
-                      ]
-                    : []),
-                ],
+                completedSteps: extractCompletedStepsFromCheckpoint(localCheckpoint),
src/components/chat/input/toolbar/model-popover.tsx (6)

375-383: Don't hijack typing in inputs/contenteditable for Alt+number.

Skip global shortcut when user is typing.

-      // Alt+number shortcuts (global)
-      if (e.altKey && !e.repeat) {
+      // Alt+number shortcuts (global)
+      if (e.altKey && !e.repeat) {
+        const target = e.target as HTMLElement | null;
+        if (
+          target &&
+          ((target.tagName === "INPUT" || target.tagName === "TEXTAREA") ||
+            target.isContentEditable)
+        ) {
+          return;
+        }
         const digit = parseInt(e.key, 10);
         if (!isNaN(digit) && digit >= 1) {
           e.preventDefault();
           selectModelByIndex(digit - 1);
         }
         return;
       }

350-356: Memoize filtered list to reduce re-binding of global key handler.

Avoid recreating listeners on every render.

-  const filteredModels = visibleModels.filter(
-    (model) =>
-      model.model_name.toLowerCase().includes(searchModel.toLowerCase()) ||
-      model.label.toLowerCase().includes(searchModel.toLowerCase()),
-  );
+  const filteredModels = useMemo(
+    () =>
+      visibleModels.filter(
+        (model) =>
+          model.model_name.toLowerCase().includes(searchModel.toLowerCase()) ||
+          model.label.toLowerCase().includes(searchModel.toLowerCase()),
+      ),
+    [visibleModels, searchModel],
+  );

Add missing import:

-import { useState, useEffect, useCallback, useRef } from "react";
+import { useState, useEffect, useCallback, useRef, useMemo } from "react";

Also applies to: 29-29


159-170: Add ARIA to visibility toggle (toggle state + label).

Improve screen reader clarity for show/hide.

-        <Button
+        <Button
           size="sm"
           variant="ghost"
           className="h-6 w-6 p-0"
           onClick={() => onToggleVisibility(model.model_name)}
+          aria-pressed={isHidden}
+          aria-label={isHidden ? `Show ${model.label}` : `Hide ${model.label}`}
         >

188-193: Reduce accidental drags: add activationConstraint.

Prevents minor pointer movement from initiating DnD.

-  const sensors = useSensors(
-    useSensor(PointerSensor),
+  const sensors = useSensors(
+    useSensor(PointerSensor, {
+      activationConstraint: { distance: 6 },
+    }),
     useSensor(KeyboardSensor, {
       coordinateGetter: sortableKeyboardCoordinates,
     }),
   );

299-303: Avoid deprecated navigator.platform; harden macOS detection.

Use UA Data when available; fallback to platform string.

-  const isMac =
-    typeof navigator !== "undefined" &&
-    navigator.platform.toUpperCase().indexOf("MAC") >= 0;
+  const isMac =
+    typeof navigator !== "undefined" &&
+    ((navigator as any).userAgentData?.platform === "macOS" ||
+      navigator.platform?.toUpperCase().includes("MAC"));

124-156: DRY the capability tags and icon cluster into a shared component.

Same UI exists in two places; extract to <ModelTags /> to avoid drift.

Example component:

// New helper component (near file bottom or in a shared module)
function ModelTags({
  model,
  size = 4,
}: { model: (typeof models)[number]; size?: 3 | 4 }) {
  const toolSupportTag = getTagInfo("toolSupport");
  const thinkingTagInfo = getTagInfo("thinking");
  const iconSize = `h-${size} w-${size}`;
  return (
    <div className="flex gap-1 items-center opacity-75">
      {model.modalities?.filter((m) => m !== "text").map((modality) => {
        const { icon: Icon, className, parentClassName } = getTagInfo(modality);
        return (
          <div key={modality} className={`p-1 rounded-md ${parentClassName}`}>
            <Icon className={`${iconSize} ${className}`} />
          </div>
        );
      })}
      {model.toolSupport && (
        <div className={`p-1 rounded-md ${toolSupportTag.parentClassName}`}>
          <Hammer className={`${iconSize} ${toolSupportTag.className}`} />
        </div>
      )}
      {model.isThinking && (
        <div className={`p-1 rounded-md ${thinkingTagInfo.parentClassName}`}>
          <thinkingTagInfo.icon className={`${iconSize} ${thinkingTagInfo.className}`} />
        </div>
      )}
    </div>
  );
}

Then replace the duplicated tag blocks in both locations with:

<ModelTags model={model} size={3} />   // in SortableModelItem
<ModelTags model={model} size={4} />   // in list rows

Also applies to: 520-555

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 285c205 and d105046.

📒 Files selected for processing (4)
  • convex/langchain/index.ts (17 hunks)
  • convex/langchain/tools/retrievalTools.ts (2 hunks)
  • src/components/chat/input/toolbar/model-popover.tsx (1 hunks)
  • src/hooks/chats/use-stream.ts (1 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
convex/**/*.{ts,js}

📄 CodeRabbit inference engine (.cursor/rules/convex_rules.mdc)

convex/**/*.{ts,js}: ALWAYS use the new function syntax for Convex functions (using query/mutation/internalQuery/internalMutation/action/internalAction with args, returns, and handler).
Always use the v.null() validator when returning a null value from a Convex function.
ALWAYS include argument and return validators for all Convex functions (query, internalQuery, mutation, internalMutation, action, internalAction). If a function doesn't return anything, include returns: v.null() as its output validator.
Use internalQuery, internalMutation, and internalAction to register internal functions (private, only callable by other Convex functions). Use query, mutation, and action to register public functions (exposed to the public Internet). Do NOT use query, mutation, or action for sensitive internal functions.
You CANNOT register a function through the api or internal objects.
When using ctx.runQuery, ctx.runMutation, or ctx.runAction to call a function in the same file, specify a type annotation on the return value to work around TypeScript circularity limitations.
Use ctx.runQuery to call a query, ctx.runMutation to call a mutation, and ctx.runAction to call an action. Only call an action from another action if you need to cross runtimes (e.g., from V8 to Node). Otherwise, pull out shared code into a helper async function.
Try to use as few calls from actions to queries and mutations as possible to avoid race conditions.
All ctx.runQuery, ctx.runMutation, and ctx.runAction calls take a FunctionReference. Do NOT pass the callee function directly.
Use the api object from convex/_generated/api.ts to call public functions and the internal object to call internal functions.
Paginated queries should use the paginationOptsValidator and return objects with page, isDone, and continueCursor properties.
v.bigint() is deprecated for representing signed 64-bit integers. Use v.int64() instead.
Use v.record() for defining a record type. v.map() and v.set() are not supported.
Index fi...

Files:

  • convex/langchain/index.ts
  • convex/langchain/tools/retrievalTools.ts
convex/**/*

📄 CodeRabbit inference engine (.cursor/rules/convex_rules.mdc)

Convex uses file-based routing; organize files with public query, mutation, or action functions within the convex/ directory.

Files:

  • convex/langchain/index.ts
  • convex/langchain/tools/retrievalTools.ts
**/*.ts

📄 CodeRabbit inference engine (.cursor/rules/convex_rules.mdc)

**/*.ts: You can use the helper TypeScript type Id imported from './_generated/dataModel' to get the type of the id for a given table.
If you need to define a Record, provide the correct key and value types (e.g., Record<Id<'users'>, string>).
Be strict with types, particularly around ids of documents. Use Id<'table'> rather than string for document ids.
Always use as const for string literals in discriminated union types.
When using the Array type, always define arrays as const array: Array = [...];
When using the Record type, always define records as const record: Record<KeyType, ValueType> = {...};

Files:

  • convex/langchain/index.ts
  • src/hooks/chats/use-stream.ts
  • convex/langchain/tools/retrievalTools.ts
🧠 Learnings (4)
📚 Learning: 2025-08-01T07:34:38.459Z
Learnt from: CR
PR: 0bs-chat/zerobs#0
File: .cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-08-01T07:34:38.459Z
Learning: Applies to convex/**/*.{ts,js} : ALWAYS include argument and return validators for all Convex functions (query, internalQuery, mutation, internalMutation, action, internalAction). If a function doesn't return anything, include returns: v.null() as its output validator.

Applied to files:

  • convex/langchain/index.ts
📚 Learning: 2025-08-01T07:34:38.459Z
Learnt from: CR
PR: 0bs-chat/zerobs#0
File: .cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-08-01T07:34:38.459Z
Learning: Applies to convex/**/*.{ts,js} : ALWAYS use the new function syntax for Convex functions (using query/mutation/internalQuery/internalMutation/action/internalAction with args, returns, and handler).

Applied to files:

  • convex/langchain/index.ts
📚 Learning: 2025-08-01T07:34:38.459Z
Learnt from: CR
PR: 0bs-chat/zerobs#0
File: .cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-08-01T07:34:38.459Z
Learning: Applies to convex/**/*.{ts,js} : Always use the v.null() validator when returning a null value from a Convex function.

Applied to files:

  • convex/langchain/index.ts
📚 Learning: 2025-08-01T07:34:38.459Z
Learnt from: CR
PR: 0bs-chat/zerobs#0
File: .cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-08-01T07:34:38.459Z
Learning: Applies to convex/**/*.{ts,js} : Do NOT use filter in queries. Instead, define an index in the schema and use withIndex instead.

Applied to files:

  • convex/langchain/tools/retrievalTools.ts
🔇 Additional comments (1)
src/components/chat/input/toolbar/model-popover.tsx (1)

238-259: LGTM: Solid DnD wiring and virtualization-friendly list structure.

Correct use of DndContext, SortableContext, and stable item ids.

Comment on lines +337 to +348
const handleModelSelect = async (modelName: string) => {
if (isNewChat) {
setNewChat((prev) => ({ ...prev, model: modelName }));
} else {
await updateChatMutation({
chatId,
updates: { model: modelName },
});
}
setPopoverOpen(false);
setHighlightedIndex(-1);
};
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Make selection resilient; always reset highlight even if mutation fails.

Ensure UI state cannot get stuck on errors.

-  const handleModelSelect = async (modelName: string) => {
-    if (isNewChat) {
-      setNewChat((prev) => ({ ...prev, model: modelName }));
-    } else {
-      await updateChatMutation({
-        chatId,
-        updates: { model: modelName },
-      });
-    }
-    setPopoverOpen(false);
-    setHighlightedIndex(-1);
-  };
+  const handleModelSelect = async (modelName: string) => {
+    try {
+      if (isNewChat) {
+        setNewChat((prev) => ({ ...prev, model: modelName }));
+        setPopoverOpen(false);
+      } else {
+        await updateChatMutation({ chatId, updates: { model: modelName } });
+        setPopoverOpen(false);
+      }
+    } finally {
+      setHighlightedIndex(-1);
+    }
+  };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const handleModelSelect = async (modelName: string) => {
if (isNewChat) {
setNewChat((prev) => ({ ...prev, model: modelName }));
} else {
await updateChatMutation({
chatId,
updates: { model: modelName },
});
}
setPopoverOpen(false);
setHighlightedIndex(-1);
};
const handleModelSelect = async (modelName: string) => {
try {
if (isNewChat) {
setNewChat((prev) => ({ ...prev, model: modelName }));
setPopoverOpen(false);
} else {
await updateChatMutation({ chatId, updates: { model: modelName } });
setPopoverOpen(false);
}
} finally {
setHighlightedIndex(-1);
}
};
🤖 Prompt for AI Agents
In src/components/chat/input/toolbar/model-popover.tsx around lines 337 to 348,
the UI resets (setPopoverOpen and setHighlightedIndex) only after awaiting the
mutation which can leave the highlight/popover stuck if the mutation throws;
wrap the async update in try/finally so that regardless of success or failure
you call setPopoverOpen(false) and setHighlightedIndex(-1) in the finally block,
and optionally catch the error to surface it (e.g., show toast or console.error)
before rethrowing or swallowing as appropriate.

- Improved code formatting and consistency across chat message helpers and components.
- Streamlined message parsing and rendering logic in StreamingMessage and ToolMessage components.
- Enhanced error handling and user feedback in message submission and retrieval processes.
- Updated utility functions for better navigation and branch management in chat threads.
- Added @convex-dev/persistent-text-streaming dependency to enhance message streaming capabilities.
- Upgraded TypeScript version to 5.9.0-beta for improved type safety and features.
- Refactored chat input toolbar and user message components for better readability and maintainability.
- Enhanced document handling in user messages and improved rendering logic for mixed content types.
coderabbitai[bot]

This comment was marked as outdated.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (6)
convex/langchain/tools/mcpTools.ts (2)

21-25: API shape change: ensure all call sites handle { tools, groupedTools }

This is a breaking change vs returning an array. Please update consumers accordingly or provide a backwards-compat shim.

Use:

#!/bin/bash
rg -n -A3 -B3 'getMCPTools\('
rg -n -A2 -B2 'mcpTools(\.map|\[0\])'

199-224: Sanitize filenames and validate URLs to prevent injection/SSRF during uploads

documentDoc.name and getDocumentUrl are untrusted. Validate scheme/host and sanitize names before passing to Fly upload.

Apply:

-		const files: { name: string; url: string }[] = (
+		const files: { name: string; url: string }[] = (
 			await Promise.all(
 				fileIds.map(async (documentId, index) => {
 					try {
 						const documentDoc = await ctx.runQuery(
 							internal.documents.crud.read,
 							{
 								id: documentId as Id<"documents">,
 							},
 						);
 						if (!documentDoc) {
 							return null;
 						}
 						// Include various document types for upload (file, image, github, text)
-						const url = await getDocumentUrl(ctx, documentDoc.key);
-						return {
-							name: `${index}_${documentDoc.name}`,
-							url,
-						};
+						const rawUrl = await getDocumentUrl(ctx, documentDoc.key);
+						try {
+							const u = new URL(rawUrl);
+							if (!["http:", "https:"].includes(u.protocol)) return null;
+							const host = u.hostname.toLowerCase();
+							const isIp = /^\d{1,3}(\.\d{1,3}){3}$/.test(host);
+							const isPrivateIp =
+								isIp &&
+								(/^(10\.|127\.|169\.254\.|192\.168\.)/.test(host) ||
+									/^172\.(1[6-9]|2\d|3[0-1])\./.test(host));
+							if (
+								host === "localhost" ||
+								host === "127.0.0.1" ||
+								host === "::1" ||
+								isPrivateIp
+							) {
+								return null;
+							}
+							const safeName = `${index}_${documentDoc.name}`
+								.replace(/[^a-zA-Z0-9._-]/g, "_")
+								.slice(0, 128);
+							return { name: safeName, url: u.toString() };
+						} catch {
+							return null;
+						}
 					} catch (error) {
 						return null;
 					}
 				}),
 			)
 		).filter((file): file is { name: string; url: string } => file !== null);
convex/langchain/index.ts (4)

518-522: Convex guideline: add returns: v.null() to regenerate action

Keeps schema explicit and type-safe.

Apply:

 export const regenerate = action({
   args: v.object({
     messageId: v.id("chatMessages"),
   }),
+  returns: v.null(),

52-84: Don’t mix completed past steps with planned steps in completedSteps

Merging pastSteps with the first plan entry mislabels planned items as completed, breaking UI expectations.

Apply:

-function extractCompletedStepsFromCheckpoint(
-  checkpoint: typeof GraphState.State | null | undefined,
-): string[] {
-  const completedSteps: string[] = [];
-  const pastSteps = (checkpoint as any)?.pastSteps as
-    | Array<[string, unknown[]]>
-    | undefined;
-  if (pastSteps && pastSteps.length > 0) {
-    completedSteps.push(...pastSteps.map((ps) => ps[0]));
-  }
-  const plan = (checkpoint as any)?.plan as
-    | Array<
-        | { type: "parallel"; data: Array<{ step: string; context: string }> }
-        | { type: "single"; data: { step: string; context: string } }
-      >
-    | undefined;
-  if (plan && plan.length > 0) {
-    const first = plan[0];
-    if (first.type === "parallel") {
-      completedSteps.push(...first.data.map((s) => s.step));
-    } else {
-      completedSteps.push(first.data.step);
-    }
-  }
-  return completedSteps;
-}
+function extractCompletedStepsFromCheckpoint(
+  checkpoint: typeof GraphState.State | null | undefined,
+): string[] {
+  const pastSteps = (checkpoint as any)?.pastSteps as
+    | Array<[string, unknown[]]>
+    | undefined;
+  if (!pastSteps || pastSteps.length === 0) return [];
+  return pastSteps.map((ps) => ps[0]);
+}

Follow-up (optional): if you need planned/active steps, return them separately as plannedSteps from the flush.


86-122: Convex guideline: add returns validator to internalAction(generateTitle)

Actions/internalActions must declare returns. This one returns nothing.

Apply:

 export const generateTitle = internalAction({
   args: v.object({
     chat: Chats.doc,
     message: ChatMessages.doc,
   }),
+  returns: v.null(),

317-385: Stream loop can terminate early: replace returns with continues for malformed custom events

Using return inside the for-await loop exits the streamer and truncates output.

Apply:

-									return;
+									continue;
...
-									return;
+									continue;
...
-										return;
+										continue;

Targets: the three return; statements after the warnings at Lines 328, 349, and 371.

🧹 Nitpick comments (2)
convex/langchain/index.ts (2)

199-213: Use the same helper for completedSteps during periodic flushes

Keeps semantics consistent with the final flush and the UI.

Apply:

-								completedSteps: [
-									...(localCheckpoint?.pastSteps?.map(
-										(pastStep) => pastStep[0],
-									) ?? []),
-									...(localCheckpoint?.plan && localCheckpoint.plan.length > 0
-										? [
-												...(localCheckpoint.plan[0].type === "parallel"
-													? localCheckpoint.plan[0].data.map(
-															(step) => step.step,
-														)
-													: [localCheckpoint.plan[0].data.step]),
-											]
-										: []),
-								],
+								completedSteps:
+									extractCompletedStepsFromCheckpoint(localCheckpoint),

414-417: Avoid non-null assertion on possibly null checkpoint

Tiny safety/readability win.

Apply:

-			finalMessages = checkpoint?.messages!;
+			finalMessages = checkpoint?.messages ?? null;
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 0fd8609 and 36067b8.

⛔ Files ignored due to path filters (1)
  • bun.lock is excluded by !**/*.lock
📒 Files selected for processing (9)
  • convex/chatMessages/helpers.ts (2 hunks)
  • convex/langchain/index.ts (1 hunks)
  • convex/langchain/tools/mcpTools.ts (1 hunks)
  • package.json (1 hunks)
  • src/components/chat/messages/ai-message/reasoning.tsx (1 hunks)
  • src/components/chat/messages/ai-message/tool-message/index.tsx (1 hunks)
  • src/components/chat/messages/ai-message/tool-message/search-results.tsx (1 hunks)
  • src/components/ui/tool-accoordion.tsx (1 hunks)
  • src/hooks/chats/use-stream.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
  • src/hooks/chats/use-stream.ts
  • src/components/chat/messages/ai-message/tool-message/index.tsx
  • convex/chatMessages/helpers.ts
  • src/components/chat/messages/ai-message/reasoning.tsx
  • src/components/ui/tool-accoordion.tsx
  • src/components/chat/messages/ai-message/tool-message/search-results.tsx
🧰 Additional context used
📓 Path-based instructions (4)
package.json

📄 CodeRabbit inference engine (.cursor/rules/convex_rules.mdc)

Always add @types/node to your package.json when using any Node.js built-in modules.

Files:

  • package.json
convex/**/*.{ts,js}

📄 CodeRabbit inference engine (.cursor/rules/convex_rules.mdc)

convex/**/*.{ts,js}: ALWAYS use the new function syntax for Convex functions (using query/mutation/internalQuery/internalMutation/action/internalAction with args, returns, and handler).
Always use the v.null() validator when returning a null value from a Convex function.
ALWAYS include argument and return validators for all Convex functions (query, internalQuery, mutation, internalMutation, action, internalAction). If a function doesn't return anything, include returns: v.null() as its output validator.
Use internalQuery, internalMutation, and internalAction to register internal functions (private, only callable by other Convex functions). Use query, mutation, and action to register public functions (exposed to the public Internet). Do NOT use query, mutation, or action for sensitive internal functions.
You CANNOT register a function through the api or internal objects.
When using ctx.runQuery, ctx.runMutation, or ctx.runAction to call a function in the same file, specify a type annotation on the return value to work around TypeScript circularity limitations.
Use ctx.runQuery to call a query, ctx.runMutation to call a mutation, and ctx.runAction to call an action. Only call an action from another action if you need to cross runtimes (e.g., from V8 to Node). Otherwise, pull out shared code into a helper async function.
Try to use as few calls from actions to queries and mutations as possible to avoid race conditions.
All ctx.runQuery, ctx.runMutation, and ctx.runAction calls take a FunctionReference. Do NOT pass the callee function directly.
Use the api object from convex/_generated/api.ts to call public functions and the internal object to call internal functions.
Paginated queries should use the paginationOptsValidator and return objects with page, isDone, and continueCursor properties.
v.bigint() is deprecated for representing signed 64-bit integers. Use v.int64() instead.
Use v.record() for defining a record type. v.map() and v.set() are not supported.
Index fi...

Files:

  • convex/langchain/index.ts
  • convex/langchain/tools/mcpTools.ts
convex/**/*

📄 CodeRabbit inference engine (.cursor/rules/convex_rules.mdc)

Convex uses file-based routing; organize files with public query, mutation, or action functions within the convex/ directory.

Files:

  • convex/langchain/index.ts
  • convex/langchain/tools/mcpTools.ts
**/*.ts

📄 CodeRabbit inference engine (.cursor/rules/convex_rules.mdc)

**/*.ts: You can use the helper TypeScript type Id imported from './_generated/dataModel' to get the type of the id for a given table.
If you need to define a Record, provide the correct key and value types (e.g., Record<Id<'users'>, string>).
Be strict with types, particularly around ids of documents. Use Id<'table'> rather than string for document ids.
Always use as const for string literals in discriminated union types.
When using the Array type, always define arrays as const array: Array = [...];
When using the Record type, always define records as const record: Record<KeyType, ValueType> = {...};

Files:

  • convex/langchain/index.ts
  • convex/langchain/tools/mcpTools.ts
🧠 Learnings (3)
📚 Learning: 2025-08-01T07:34:38.459Z
Learnt from: CR
PR: 0bs-chat/zerobs#0
File: .cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-08-01T07:34:38.459Z
Learning: Applies to convex/**/*.{ts,js} : ALWAYS include argument and return validators for all Convex functions (query, internalQuery, mutation, internalMutation, action, internalAction). If a function doesn't return anything, include returns: v.null() as its output validator.

Applied to files:

  • convex/langchain/index.ts
📚 Learning: 2025-08-01T07:34:38.459Z
Learnt from: CR
PR: 0bs-chat/zerobs#0
File: .cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-08-01T07:34:38.459Z
Learning: Applies to convex/**/*.{ts,js} : ALWAYS use the new function syntax for Convex functions (using query/mutation/internalQuery/internalMutation/action/internalAction with args, returns, and handler).

Applied to files:

  • convex/langchain/index.ts
📚 Learning: 2025-08-01T07:34:38.459Z
Learnt from: CR
PR: 0bs-chat/zerobs#0
File: .cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-08-01T07:34:38.459Z
Learning: Applies to convex/**/*.{ts,js} : Always use the v.null() validator when returning a null value from a Convex function.

Applied to files:

  • convex/langchain/index.ts
🧬 Code graph analysis (1)
convex/langchain/index.ts (2)
convex/langchain/state.ts (3)
  • ToolChunkGroup (70-77)
  • GraphState (50-62)
  • AIChunkGroup (64-68)
convex/chatMessages/helpers.ts (2)
  • getThreadFromMessage (235-251)
  • processBufferToMessages (331-340)
🔇 Additional comments (2)
package.json (2)

135-135: LGTM: @types/node present

Meets the guideline to include Node typings when using Node APIs.


146-146: TypeScript 5.9 beta: validate toolchain compatibility before merging
Beta compilers can break Vite/TS plugins and CI. Ensure dependencies are installed and the repo builds cleanly across environments.
Run:

pnpm install --frozen-lockfile
pnpm run build       # or npm run build
npx tsc --noEmit

Comment on lines 124 to +131
export const chat = action({
args: v.object({
chatId: v.id("chats"),
model: v.optional(v.string()),
}),
handler: async (ctx, args) => {
const { chatId } = args;
const prep = await ctx.runMutation(internal.langchain.utils.prepareChat, {
chatId,
model: args.model,
});
const { chat, message, messages, customPrompt } = prep!;
const thread = getThreadFromMessage(message, messages);

const modelConfig = models.find((m) => m.model_name === chat.model);
const multiplier = modelConfig?.usageRateMultiplier ?? 1.0;

// Check usage limits before processing chat, accounting for multiplier
const usageCheck = await checkInternal(
chat.userId!,
"messages",
multiplier,
);
if (!usageCheck.allowed) {
throw new Error(
`Message limit exceeded. ${usageCheck.message || "Please upgrade your plan to send more messages."}`,
);
}

const checkpointer = new MemorySaver();
const agent = agentGraph.compile({ checkpointer });

const abort = new AbortController();
const stream = agent.streamEvents(
{ messages: thread.map((m) => m.message) },
{
version: "v2",
configurable: { ctx, chat, customPrompt, thread_id: chatId },
recursionLimit: 30,
signal: abort.signal,
},
);

let streamDoc: Doc<"streams"> | null = null;
let buffer: string[] = [];
let accumulatedBuffer: string[] = [];
let checkpoint: typeof GraphState.State | null = null;
let finalMessages: BaseMessage[] | null = null;
let finished = false;
let hadError = false;

const flushAndStream = async (): Promise<
typeof GraphState.State | null
> => {
let localCheckpoint: typeof GraphState.State | null = null;

const flusher = async () => {
while (!finished) {
if (buffer.length > 0) {
const chunks = buffer;
buffer = [];
accumulatedBuffer = [...accumulatedBuffer, ...chunks];
streamDoc = await ctx.runMutation(
internal.streams.mutations.flush,
{
chatId,
chunks,
completedSteps: [
...(localCheckpoint?.pastSteps?.map(
(pastStep) => pastStep[0],
) ?? []),
...(localCheckpoint?.plan && localCheckpoint.plan.length > 0
? [
...(localCheckpoint.plan[0].type === "parallel"
? localCheckpoint.plan[0].data.map(
(step) => step.step,
)
: [localCheckpoint.plan[0].data.step]),
]
: []),
],
},
);
}
if (streamDoc?.status === "cancelled") {
abort.abort();
return null;
}
await new Promise((resolve) => setTimeout(resolve, 300));
}
};

const streamer = async () => {
try {
for await (const evt of stream) {
if (abort.signal.aborted) {
return;
}
localCheckpoint = (
await agent.getState({
configurable: { thread_id: chatId },
})
).values as typeof GraphState.State;

const allowedNodes = ["baseAgent", "simple", "plannerAgent"];
if (
allowedNodes.some((node) =>
evt.metadata?.checkpoint_ns?.startsWith(node),
)
) {
if (evt.event === "on_chat_model_stream") {
buffer.push(
JSON.stringify({
type: "ai",
content: evt.data?.chunk?.content ?? "",
reasoning:
evt.data?.chunk?.additional_kwargs?.reasoning_content,
} as AIChunkGroup),
);
} else if (evt.event === "on_tool_start") {
buffer.push(
JSON.stringify({
type: "tool",
toolName: evt.name,
input: evt.data?.input,
isComplete: false,
toolCallId: evt.run_id,
} as ToolChunkGroup),
);
} else if (evt.event === "on_tool_end") {
let output = evt.data?.output.content;

if (Array.isArray(output)) {
output = await Promise.all(
output.map(async (item: any) => {
if (
item.type === "image_url" &&
item.image_url &&
item.image_url.url
) {
return {
type: "image_url",
image_url: {
url: "https://t3.chat/images/noise.png",
},
};
}
return item;
}),
);
}

buffer.push(
JSON.stringify({
type: "tool",
toolName: evt.name,
input: evt.data?.input,
output,
isComplete: true,
toolCallId: evt.run_id,
} as ToolChunkGroup),
);
}
}
}
} finally {
finished = true;
}
};

await Promise.all([flusher(), streamer()]);
return localCheckpoint;
};

try {
checkpoint = await flushAndStream();
finalMessages = checkpoint?.messages!;
} catch (e) {
hadError = true;
// Create messages from accumulated buffer and combine with existing thread
const bufferMessages = processBufferToMessages(accumulatedBuffer);
finalMessages = [...thread.map((m) => m.message), ...bufferMessages];

if (abort.signal.aborted) {
// Continue processing the buffer messages even when aborted
} else {
// Update status to error but continue processing
await ctx.runMutation(internal.streams.mutations.update, {
chatId,
updates: {
completedSteps: [],
status: "error",
},
});
}
}

const newMessages = finalMessages?.slice(thread.length);
if (newMessages?.length) {
const parent: Id<"chatMessages"> | null = thread.length
? thread[thread.length - 1]._id
: null;

// Process all messages and prepare them for batch creation
const messagesToCreate: Array<{
message: string;
parentId?: Id<"chatMessages">;
}> = [];

for (const m of newMessages) {
let stored = mapChatMessagesToStoredMessages([m])[0];

if (m instanceof ToolMessage && Array.isArray(m.content)) {
const patched = await Promise.all(
m.content.map(async (item) => {
if (
item.type === "image_url" &&
item.image_url?.url?.startsWith("data:")
) {
const [, mime, base64] =
item.image_url.url.match(/^data:(.+);base64,(.+)$/) ?? [];
const blob = await (
await fetch(`data:${mime};base64,${base64}`)
).blob();
const key = await ctx.storage.store(blob);
const docId = await ctx.runMutation(
api.documents.mutations.create,
{
name: "Image Upload - " + new Date().toISOString(),
type: "file",
key,
size: blob.size,
},
);
return { type: "file", file: { file_id: docId } };
}
return item;
}),
);
stored = {
...stored,
data: { ...stored.data, content: JSON.stringify(patched) },
};
}

messagesToCreate.push({
message: JSON.stringify(stored),
parentId: parent ?? undefined,
});
}

// Create all messages in a single batch operation
if (messagesToCreate.length > 0) {
await ctx.runMutation(internal.chats.mutations.createRaw, {
chatId,
messages: messagesToCreate,
});
}
}

// Only update to "done" if there was no error and we weren't aborted
if (!hadError && !abort.signal.aborted) {
await ctx.runMutation(internal.streams.mutations.update, {
chatId,
updates: { status: "done", completedSteps: [] },
});
}

// Track message usage - count the number of new messages created
if (newMessages?.length) {
// Apply multiplier and round to nearest integer
const usageValue = Math.round(newMessages.length * multiplier);

await trackInternal(chat.userId!, "messages", usageValue);
}
},
args: v.object({
chatId: v.id("chats"),
model: v.optional(v.string()),
}),
handler: async (ctx, args) => {
const { chatId } = args;
const prep = await ctx.runMutation(internal.langchain.utils.prepareChat, {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Convex guideline: add returns: v.null() to chat action

Required by repo rules for all Convex functions.

Apply:

 export const chat = action({
   args: v.object({
     chatId: v.id("chats"),
     model: v.optional(v.string()),
   }),
+  returns: v.null(),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export const chat = action({
args: v.object({
chatId: v.id("chats"),
model: v.optional(v.string()),
}),
handler: async (ctx, args) => {
const { chatId } = args;
const prep = await ctx.runMutation(internal.langchain.utils.prepareChat, {
chatId,
model: args.model,
});
const { chat, message, messages, customPrompt } = prep!;
const thread = getThreadFromMessage(message, messages);
const modelConfig = models.find((m) => m.model_name === chat.model);
const multiplier = modelConfig?.usageRateMultiplier ?? 1.0;
// Check usage limits before processing chat, accounting for multiplier
const usageCheck = await checkInternal(
chat.userId!,
"messages",
multiplier,
);
if (!usageCheck.allowed) {
throw new Error(
`Message limit exceeded. ${usageCheck.message || "Please upgrade your plan to send more messages."}`,
);
}
const checkpointer = new MemorySaver();
const agent = agentGraph.compile({ checkpointer });
const abort = new AbortController();
const stream = agent.streamEvents(
{ messages: thread.map((m) => m.message) },
{
version: "v2",
configurable: { ctx, chat, customPrompt, thread_id: chatId },
recursionLimit: 30,
signal: abort.signal,
},
);
let streamDoc: Doc<"streams"> | null = null;
let buffer: string[] = [];
let accumulatedBuffer: string[] = [];
let checkpoint: typeof GraphState.State | null = null;
let finalMessages: BaseMessage[] | null = null;
let finished = false;
let hadError = false;
const flushAndStream = async (): Promise<
typeof GraphState.State | null
> => {
let localCheckpoint: typeof GraphState.State | null = null;
const flusher = async () => {
while (!finished) {
if (buffer.length > 0) {
const chunks = buffer;
buffer = [];
accumulatedBuffer = [...accumulatedBuffer, ...chunks];
streamDoc = await ctx.runMutation(
internal.streams.mutations.flush,
{
chatId,
chunks,
completedSteps: [
...(localCheckpoint?.pastSteps?.map(
(pastStep) => pastStep[0],
) ?? []),
...(localCheckpoint?.plan && localCheckpoint.plan.length > 0
? [
...(localCheckpoint.plan[0].type === "parallel"
? localCheckpoint.plan[0].data.map(
(step) => step.step,
)
: [localCheckpoint.plan[0].data.step]),
]
: []),
],
},
);
}
if (streamDoc?.status === "cancelled") {
abort.abort();
return null;
}
await new Promise((resolve) => setTimeout(resolve, 300));
}
};
const streamer = async () => {
try {
for await (const evt of stream) {
if (abort.signal.aborted) {
return;
}
localCheckpoint = (
await agent.getState({
configurable: { thread_id: chatId },
})
).values as typeof GraphState.State;
const allowedNodes = ["baseAgent", "simple", "plannerAgent"];
if (
allowedNodes.some((node) =>
evt.metadata?.checkpoint_ns?.startsWith(node),
)
) {
if (evt.event === "on_chat_model_stream") {
buffer.push(
JSON.stringify({
type: "ai",
content: evt.data?.chunk?.content ?? "",
reasoning:
evt.data?.chunk?.additional_kwargs?.reasoning_content,
} as AIChunkGroup),
);
} else if (evt.event === "on_tool_start") {
buffer.push(
JSON.stringify({
type: "tool",
toolName: evt.name,
input: evt.data?.input,
isComplete: false,
toolCallId: evt.run_id,
} as ToolChunkGroup),
);
} else if (evt.event === "on_tool_end") {
let output = evt.data?.output.content;
if (Array.isArray(output)) {
output = await Promise.all(
output.map(async (item: any) => {
if (
item.type === "image_url" &&
item.image_url &&
item.image_url.url
) {
return {
type: "image_url",
image_url: {
url: "https://t3.chat/images/noise.png",
},
};
}
return item;
}),
);
}
buffer.push(
JSON.stringify({
type: "tool",
toolName: evt.name,
input: evt.data?.input,
output,
isComplete: true,
toolCallId: evt.run_id,
} as ToolChunkGroup),
);
}
}
}
} finally {
finished = true;
}
};
await Promise.all([flusher(), streamer()]);
return localCheckpoint;
};
try {
checkpoint = await flushAndStream();
finalMessages = checkpoint?.messages!;
} catch (e) {
hadError = true;
// Create messages from accumulated buffer and combine with existing thread
const bufferMessages = processBufferToMessages(accumulatedBuffer);
finalMessages = [...thread.map((m) => m.message), ...bufferMessages];
if (abort.signal.aborted) {
// Continue processing the buffer messages even when aborted
} else {
// Update status to error but continue processing
await ctx.runMutation(internal.streams.mutations.update, {
chatId,
updates: {
completedSteps: [],
status: "error",
},
});
}
}
const newMessages = finalMessages?.slice(thread.length);
if (newMessages?.length) {
const parent: Id<"chatMessages"> | null = thread.length
? thread[thread.length - 1]._id
: null;
// Process all messages and prepare them for batch creation
const messagesToCreate: Array<{
message: string;
parentId?: Id<"chatMessages">;
}> = [];
for (const m of newMessages) {
let stored = mapChatMessagesToStoredMessages([m])[0];
if (m instanceof ToolMessage && Array.isArray(m.content)) {
const patched = await Promise.all(
m.content.map(async (item) => {
if (
item.type === "image_url" &&
item.image_url?.url?.startsWith("data:")
) {
const [, mime, base64] =
item.image_url.url.match(/^data:(.+);base64,(.+)$/) ?? [];
const blob = await (
await fetch(`data:${mime};base64,${base64}`)
).blob();
const key = await ctx.storage.store(blob);
const docId = await ctx.runMutation(
api.documents.mutations.create,
{
name: "Image Upload - " + new Date().toISOString(),
type: "file",
key,
size: blob.size,
},
);
return { type: "file", file: { file_id: docId } };
}
return item;
}),
);
stored = {
...stored,
data: { ...stored.data, content: JSON.stringify(patched) },
};
}
messagesToCreate.push({
message: JSON.stringify(stored),
parentId: parent ?? undefined,
});
}
// Create all messages in a single batch operation
if (messagesToCreate.length > 0) {
await ctx.runMutation(internal.chats.mutations.createRaw, {
chatId,
messages: messagesToCreate,
});
}
}
// Only update to "done" if there was no error and we weren't aborted
if (!hadError && !abort.signal.aborted) {
await ctx.runMutation(internal.streams.mutations.update, {
chatId,
updates: { status: "done", completedSteps: [] },
});
}
// Track message usage - count the number of new messages created
if (newMessages?.length) {
// Apply multiplier and round to nearest integer
const usageValue = Math.round(newMessages.length * multiplier);
await trackInternal(chat.userId!, "messages", usageValue);
}
},
args: v.object({
chatId: v.id("chats"),
model: v.optional(v.string()),
}),
handler: async (ctx, args) => {
const { chatId } = args;
const prep = await ctx.runMutation(internal.langchain.utils.prepareChat, {
export const chat = action({
args: v.object({
chatId: v.id("chats"),
model: v.optional(v.string()),
}),
returns: v.null(),
handler: async (ctx, args) => {
const { chatId } = args;
const prep = await ctx.runMutation(internal.langchain.utils.prepareChat, {
// ...
});
// ...
},
});
🤖 Prompt for AI Agents
In convex/langchain/index.ts around lines 124 to 131, the chat action is missing
the required Convex schema for the return type; add returns: v.null() to the
action definition alongside args (i.e., change action({ args: ..., handler: ...
}) to action({ args: ..., returns: v.null(), handler: ... })) so the function
conforms to the repo Convex guideline.

Comment on lines +541 to +552
args: v.object({
chatId: v.id("chats"),
branchFrom: v.id("chatMessages"),
model: v.optional(v.string()),
editedContent: v.optional(
v.object({
text: v.optional(v.string()),
documents: v.optional(v.array(v.id("documents"))),
}),
),
}),
handler: async (ctx, args): Promise<{ newChatId: Id<"chats"> }> => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Convex guideline: declare returns schema for branchChat

The handler returns { newChatId }. Add the return validator.

Apply:

 export const branchChat = action({
   args: v.object({
     chatId: v.id("chats"),
     branchFrom: v.id("chatMessages"),
     model: v.optional(v.string()),
     editedContent: v.optional(
       v.object({
         text: v.optional(v.string()),
         documents: v.optional(v.array(v.id("documents"))),
       }),
     ),
   }),
+  returns: v.object({ newChatId: v.id("chats") }),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
args: v.object({
chatId: v.id("chats"),
branchFrom: v.id("chatMessages"),
model: v.optional(v.string()),
editedContent: v.optional(
v.object({
text: v.optional(v.string()),
documents: v.optional(v.array(v.id("documents"))),
}),
),
}),
handler: async (ctx, args): Promise<{ newChatId: Id<"chats"> }> => {
export const branchChat = action({
args: v.object({
chatId: v.id("chats"),
branchFrom: v.id("chatMessages"),
model: v.optional(v.string()),
editedContent: v.optional(
v.object({
text: v.optional(v.string()),
documents: v.optional(v.array(v.id("documents"))),
}),
),
}),
returns: v.object({ newChatId: v.id("chats") }),
handler: async (ctx, args): Promise<{ newChatId: Id<"chats"> }> => {
// ...existing implementation...
},
});
🤖 Prompt for AI Agents
In convex/langchain/index.ts around lines 541 to 552, the handler for branchChat
declares args but lacks a return schema; add a returns validator to the
procedure definition that validates the returned shape, e.g. set returns:
v.object({ newChatId: v.id("chats") }) alongside the existing args, so the
handler's returned { newChatId } is validated.

Comment on lines +77 to +86
const machine = await fly.getMachineByName(appDoc?._id, "machine");

try {
await fly.startMachine(appDoc?._id, machine?.id!);
} catch (error) {}
await fly.waitTillHealthy(appDoc?._id, machine?.id!, {
timeout: 120000,
interval: 1000,
});

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Guard Fly machine lifecycle to stdio/docker and avoid unsafe non-null assertions

Unconditionally calling startMachine/waitTillHealthy for HTTP MCPs can error/hang, and machine?.id! risks runtime undefined usage.

Apply:

-				const machine = await fly.getMachineByName(appDoc?._id, "machine");
-
-				try {
-					await fly.startMachine(appDoc?._id, machine?.id!);
-				} catch (error) {}
-				await fly.waitTillHealthy(appDoc?._id, machine?.id!, {
-					timeout: 120000,
-					interval: 1000,
-				});
+				const machine =
+					["stdio", "docker"].includes(mcp.type)
+						? await fly.getMachineByName(appDoc._id, "machine")
+						: null;
+
+				if (["stdio", "docker"].includes(mcp.type)) {
+					if (!machine?.id) {
+						console.error(`MCP ${mcp._id} missing Fly machine id`);
+						return null;
+					}
+					try {
+						await fly.startMachine(appDoc._id, machine.id);
+					} catch (error) {}
+					await fly.waitTillHealthy(appDoc._id, machine.id, {
+						timeout: 120000,
+						interval: 1000,
+					});
+				}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const machine = await fly.getMachineByName(appDoc?._id, "machine");
try {
await fly.startMachine(appDoc?._id, machine?.id!);
} catch (error) {}
await fly.waitTillHealthy(appDoc?._id, machine?.id!, {
timeout: 120000,
interval: 1000,
});
const machine =
["stdio", "docker"].includes(mcp.type)
? await fly.getMachineByName(appDoc._id, "machine")
: null;
if (["stdio", "docker"].includes(mcp.type)) {
if (!machine?.id) {
console.error(`MCP ${mcp._id} missing Fly machine id`);
return null;
}
try {
await fly.startMachine(appDoc._id, machine.id);
} catch (error) {}
await fly.waitTillHealthy(appDoc._id, machine.id, {
timeout: 120000,
interval: 1000,
});
}
🤖 Prompt for AI Agents
In convex/langchain/tools/mcpTools.ts around lines 77 to 86, the code
unconditionally starts/waits on a Fly machine and uses the unsafe non-null
assertion machine?.id!, which can throw or hang for HTTP (MCP) types; update the
logic to first verify the MCP runtime type is one that needs a process (e.g.,
"stdio" or "docker"), then ensure machine is defined before calling
startMachine/waitTillHealthy (avoid ! and use explicit checks), wrap start/wait
in try/catch that logs/handles errors and short-circuits if machine is missing,
and skip start/wait entirely for HTTP MCPs to prevent hangs and runtime
undefined access.

Comment on lines +125 to +133
if (attempt >= 5) {
try {
await fly.startMachine(appDoc._id, machine?.id!);
} catch (error) {}
await fly.waitTillHealthy(appDoc._id, machine?.id!, {
timeout: 120000,
interval: 1000,
});
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Retry block must also respect transport and machine id

Re-issuing Fly ops on HTTP MCPs after 5 attempts can still break the flow.

Apply:

-						if (attempt >= 5) {
-							try {
-								await fly.startMachine(appDoc._id, machine?.id!);
-							} catch (error) {}
-							await fly.waitTillHealthy(appDoc._id, machine?.id!, {
-								timeout: 120000,
-								interval: 1000,
-							});
-						}
+						if (
+							attempt >= 5 &&
+							["stdio", "docker"].includes(mcp.type) &&
+							machine?.id
+						) {
+							try {
+								await fly.startMachine(appDoc._id, machine.id);
+							} catch (error) {}
+							await fly.waitTillHealthy(appDoc._id, machine.id, {
+								timeout: 120000,
+								interval: 1000,
+							});
+						}
🤖 Prompt for AI Agents
In convex/langchain/tools/mcpTools.ts around lines 125-133, the retry branch
unconditionally re-issues Fly operations after 5 attempts which can break
HTTP-based MCPs and also uses machine?.id! without respecting transport; update
the logic to only call fly.startMachine and fly.waitTillHealthy when machine.id
exists and transport is not 'http' (or the equivalent HTTP check used
elsewhere), pass the correct machine.id and transport into those Fly helper
calls if they accept transport, and handle the catch by surfacing/logging the
error instead of swallowing it so failures are visible.

Comment on lines +226 to +236
await Promise.all(
validClientsAndTools.map(async ({ mcp, appDoc, machine }) => {
if (
["stdio", "docker"].includes(mcp.type) &&
files.length > 0 &&
machine?.id
) {
await fly.uploadFileToMachine(appDoc._id, machine.id, files);
}
}),
);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Harden uploads: settle promises and isolate failures per MCP

A single upload failure currently fails all uploads.

Apply:

-		await Promise.all(
-			validClientsAndTools.map(async ({ mcp, appDoc, machine }) => {
+		await Promise.allSettled(
+			validClientsAndTools.map(async ({ mcp, appDoc, machine }) => {
 				if (
 					["stdio", "docker"].includes(mcp.type) &&
 					files.length > 0 &&
 					machine?.id
 				) {
-					await fly.uploadFileToMachine(appDoc._id, machine.id, files);
+					try {
+						await fly.uploadFileToMachine(appDoc._id, machine.id, files);
+					} catch (error) {
+						console.warn(
+							`uploadFileToMachine failed for ${mcp._id}/${machine.id}:`,
+							error,
+						);
+					}
 				}
 			}),
 		);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
await Promise.all(
validClientsAndTools.map(async ({ mcp, appDoc, machine }) => {
if (
["stdio", "docker"].includes(mcp.type) &&
files.length > 0 &&
machine?.id
) {
await fly.uploadFileToMachine(appDoc._id, machine.id, files);
}
}),
);
// Harden uploads: settle promises and isolate failures per MCP
await Promise.allSettled(
validClientsAndTools.map(async ({ mcp, appDoc, machine }) => {
if (
["stdio", "docker"].includes(mcp.type) &&
files.length > 0 &&
machine?.id
) {
try {
await fly.uploadFileToMachine(appDoc._id, machine.id, files);
} catch (error) {
console.warn(
`uploadFileToMachine failed for ${mcp._id}/${machine.id}:`,
error,
);
}
}
}),
);
🤖 Prompt for AI Agents
In convex/langchain/tools/mcpTools.ts around lines 226 to 236, the current
Promise.all call lets one upload failure reject the whole batch; change to
isolate failures per MCP by mapping validClientsAndTools to per-MCP async tasks
that wrap the upload in try/catch (or use Promise.allSettled) so each upload is
awaited independently, log or record the error with the relevant
mcp/appDoc/machine identifiers, and ensure other uploads continue even if one
fails.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant