diff --git a/CHANGELOG.md b/CHANGELOG.md index 70bf260296..5d47e29653 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,12 @@ and this project adheres to [#3702](https://github.com/OpenFn/lightning/issues/3702) - Reintroduce the impeded project with hopefully better performance characteristics [#3542](https://github.com/OpenFn/lightning/issues/3542) +- **AI Assistant Streaming**: AI responses now stream in real-time with status updates + - Users see AI responses appear word-by-word as they're generated + - Status indicators show thinking progress (e.g., "Researching...", "Generating code...") + - Automatic error recovery with retry/cancel options + - Configurable timeout based on Apollo settings + [#3585](https://github.com/OpenFn/lightning/issues/3585) ### Changed @@ -66,6 +72,13 @@ and this project adheres to unauthorized edits when user roles change during active collaboration sessions [#3749](https://github.com/OpenFn/lightning/issues/3749) +### Technical + +- Added `Lightning.ApolloClient.SSEStream` for Server-Sent Events handling +- Enhanced `MessageProcessor` to support streaming responses +- Updated AI Assistant component with real-time markdown rendering +- Improved error handling for network failures and timeouts + ## [2.14.11] - 2025-10-15 ## [2.14.11-pre1] - 2025-10-15 diff --git a/assets/js/hooks/index.ts b/assets/js/hooks/index.ts index 4f4e2c82ae..fa9c50b103 100644 --- a/assets/js/hooks/index.ts +++ b/assets/js/hooks/index.ts @@ -4,6 +4,7 @@ import tippy, { } from 'tippy.js'; import { format, formatRelative } from 'date-fns'; import { enUS } from 'date-fns/locale'; +import { marked } from 'marked'; import type { PhoenixHook } from './PhoenixHook'; import LogLineHighlight from './LogLineHighlight'; @@ -684,9 +685,38 @@ export const BlurDataclipEditor = { export const ScrollToMessage = { mounted() { + this.shouldAutoScroll = true; + + this.handleScrollThrottled = this.throttle(() => { + const isAtBottom = this.isAtBottom(); + this.shouldAutoScroll = isAtBottom; + }, 100); + + this.el.addEventListener('scroll', this.handleScrollThrottled); this.handleScroll(); }, + destroyed() { + if (this.handleScrollThrottled) { + this.el.removeEventListener('scroll', this.handleScrollThrottled); + } + if (this.throttleTimeout !== undefined) { + clearTimeout(this.throttleTimeout); + } + }, + + throttle(func: () => void, wait: number): () => void { + return () => { + if (this.throttleTimeout !== undefined) { + clearTimeout(this.throttleTimeout); + } + this.throttleTimeout = setTimeout(() => { + func(); + this.throttleTimeout = undefined; + }, wait) as unknown as number; + }; + }, + updated() { this.handleScroll(); }, @@ -696,7 +726,7 @@ export const ScrollToMessage = { if (targetMessageId) { this.scrollToSpecificMessage(targetMessageId); - } else { + } else if (this.shouldAutoScroll) { this.scrollToBottom(); } }, @@ -717,18 +747,25 @@ export const ScrollToMessage = { } }, + isAtBottom() { + const threshold = 50; + const position = this.el.scrollTop + this.el.clientHeight; + const height = this.el.scrollHeight; + return height - position <= threshold; + }, + scrollToBottom() { - setTimeout(() => { - this.el.scrollTo({ - top: this.el.scrollHeight, - behavior: 'smooth', - }); - }, 600); + this.el.scrollTop = this.el.scrollHeight; }, } as PhoenixHook<{ + shouldAutoScroll: boolean; + handleScrollThrottled?: () => void; + throttleTimeout?: number; + throttle: (func: () => void, wait: number) => () => void; handleScroll: () => void; scrollToSpecificMessage: (messageId: string) => void; scrollToBottom: () => void; + isAtBottom: () => boolean; }>; export const Copy = { @@ -1020,3 +1057,95 @@ export const LocalTimeConverter = { convertDateTime: () => void; convertToDisplayTime: (isoTimestamp: string, display: string) => void; }>; + +export const StreamingText = { + mounted() { + this.lastContent = ''; + this.renderer = this.createCustomRenderer(); + this.parseCount = 0; + this.pendingUpdate = undefined; + this.updateContent(); + }, + + updated() { + // Debounce updates by 50ms to batch rapid chunk arrivals + if (this.pendingUpdate !== undefined) { + clearTimeout(this.pendingUpdate); + } + + this.pendingUpdate = setTimeout(() => { + this.updateContent(); + this.pendingUpdate = undefined; + }, 50) as unknown as number; + }, + + destroyed() { + if (this.pendingUpdate !== undefined) { + clearTimeout(this.pendingUpdate); + } + }, + + createCustomRenderer() { + const renderer = new marked.Renderer(); + + renderer.code = (code, language) => { + const lang = language ? ` class="${language}"` : ''; + return `
${code}`;
+ };
+
+ renderer.link = (href, title, text) => {
+ return `${text}`;
+ };
+
+ renderer.heading = (text, level) => {
+ const classes = level === 1 ? 'text-2xl font-bold mb-6' : 'text-xl font-semibold mb-4 mt-8';
+ return `${text}
`; + }; + + return renderer; + }, + + updateContent() { + const start = performance.now(); + const newContent = this.el.dataset.streamingContent || ''; + + if (newContent !== this.lastContent) { + this.parseCount++; + + const htmlContent = marked.parse(newContent, { + renderer: this.renderer, + breaks: true, + gfm: true, + }); + + this.el.innerHTML = htmlContent; + this.lastContent = newContent; + + const duration = performance.now() - start; + console.debug( + `[StreamingText] Parse #${this.parseCount}: ${duration.toFixed(2)}ms for ${newContent.length} chars` + ); + } + }, +} as PhoenixHook<{ + lastContent: string; + renderer: marked.Renderer; + parseCount: number; + pendingUpdate?: number; + createCustomRenderer: () => marked.Renderer; + updateContent: () => void; +}>; diff --git a/lib/lightning/ai_assistant/ai_assistant.ex b/lib/lightning/ai_assistant/ai_assistant.ex index 8c9389f592..6e2fb714fd 100644 --- a/lib/lightning/ai_assistant/ai_assistant.ex +++ b/lib/lightning/ai_assistant/ai_assistant.ex @@ -569,12 +569,16 @@ defmodule Lightning.AiAssistant do ## Returns - List of `%ChatMessage{}` structs with `:role` of `:user` and `:status` of `:pending`. + List of `%ChatMessage{}` structs with `:role` of `:user` and `:status` of `:pending` or `:processing`. """ @spec find_pending_user_messages(ChatSession.t()) :: [ChatMessage.t()] def find_pending_user_messages(session) do messages = session.messages || [] - Enum.filter(messages, &(&1.role == :user && &1.status == :pending)) + + Enum.filter( + messages, + &(&1.role == :user && &1.status in [:pending, :processing]) + ) end @doc """ diff --git a/lib/lightning/ai_assistant/message_processor.ex b/lib/lightning/ai_assistant/message_processor.ex index 1c5c3d6feb..80b6cf5a01 100644 --- a/lib/lightning/ai_assistant/message_processor.ex +++ b/lib/lightning/ai_assistant/message_processor.ex @@ -13,6 +13,7 @@ defmodule Lightning.AiAssistant.MessageProcessor do alias Lightning.AiAssistant alias Lightning.AiAssistant.ChatMessage alias Lightning.AiAssistant.ChatSession + alias Lightning.ApolloClient.SSEStream alias Lightning.Repo require Logger @@ -38,11 +39,11 @@ defmodule Lightning.AiAssistant.MessageProcessor do @impl Oban.Worker @spec perform(Oban.Job.t()) :: :ok def perform(%Oban.Job{args: %{"message_id" => message_id}}) do - Logger.info("[MessageProcessor] Processing message: #{message_id}") + Logger.debug("[MessageProcessor] Processing message: #{message_id}") case process_message(message_id) do {:ok, _updated_session} -> - Logger.info( + Logger.debug( "[MessageProcessor] Successfully processed message: #{message_id}" ) @@ -77,7 +78,7 @@ defmodule Lightning.AiAssistant.MessageProcessor do @doc false @spec process_message(String.t()) :: - {:ok, AiAssistant.ChatSession.t()} | {:error, String.t()} + {:ok, AiAssistant.ChatSession.t() | :streaming} | {:error, String.t()} defp process_message(message_id) do {:ok, session, message} = ChatMessage @@ -94,6 +95,9 @@ defmodule Lightning.AiAssistant.MessageProcessor do end case result do + {:ok, :streaming} -> + {:ok, session} + {:ok, _} -> {:ok, updated_session, _updated_message} = update_message_status(message, :success) @@ -110,7 +114,7 @@ defmodule Lightning.AiAssistant.MessageProcessor do @doc false @spec process_job_message(AiAssistant.ChatSession.t(), ChatMessage.t()) :: - {:ok, AiAssistant.ChatSession.t()} | {:error, String.t()} + {:ok, :streaming | AiAssistant.ChatSession.t()} | {:error, String.t()} defp process_job_message(session, message) do enriched_session = AiAssistant.enrich_session_with_job_context(session) @@ -123,15 +127,148 @@ defmodule Lightning.AiAssistant.MessageProcessor do [] end - AiAssistant.query(enriched_session, message.content, options) + stream_job_message(enriched_session, message.content, options) + end + + @doc false + @spec stream_job_message(AiAssistant.ChatSession.t(), String.t(), keyword()) :: + {:ok, :streaming | AiAssistant.ChatSession.t()} | {:error, String.t()} + defp stream_job_message(session, content, options) do + start_streaming_request(session, content, options) + {:ok, :streaming} + rescue + _ -> + AiAssistant.query(session, content, options) + end + + @doc false + @spec start_streaming_request( + AiAssistant.ChatSession.t(), + String.t(), + keyword() + ) :: :ok + defp start_streaming_request(session, content, options) do + context = build_context(session, options) + history = get_chat_history(session) + + payload = %{ + "api_key" => Lightning.Config.apollo(:ai_assistant_api_key), + "content" => content, + "context" => context, + "history" => history, + "meta" => session.meta || %{}, + "stream" => true + } + + sse_payload = Map.put(payload, "lightning_session_id", session.id) + apollo_url = get_apollo_url("job_chat") + + case SSEStream.start_stream(apollo_url, sse_payload) do + {:ok, _pid} -> + Logger.debug( + "[MessageProcessor] Started Apollo SSE stream for session #{session.id}" + ) + + {:error, reason} -> + Logger.error( + "[MessageProcessor] Failed to start Apollo stream: #{inspect(reason)}" + ) + + Logger.debug("[MessageProcessor] Falling back to HTTP client") + raise "SSE stream failed, falling back to HTTP (not implemented yet)" + end + + :ok + end + + defp get_apollo_url(service) do + "#{Lightning.Config.apollo(:endpoint)}/services/#{service}/stream" + end + + defp get_chat_history(session) do + session.messages + |> Enum.map(fn message -> + %{ + "role" => to_string(message.role), + "content" => message.content + } + end) + end + + defp build_context(session, options) do + base_context = %{ + expression: session.expression, + adaptor: session.adaptor, + log: session.logs + } + + Enum.reduce(options, base_context, fn + {:code, false}, acc -> + Map.drop(acc, [:expression]) + + {:logs, false}, acc -> + Map.drop(acc, [:log]) + + _opt, acc -> + acc + end) end @doc false @spec process_workflow_message(AiAssistant.ChatSession.t(), ChatMessage.t()) :: - {:ok, AiAssistant.ChatSession.t()} | {:error, String.t()} + {:ok, :streaming | AiAssistant.ChatSession.t()} | {:error, String.t()} defp process_workflow_message(session, message) do code = message.code || workflow_code_from_session(session) - AiAssistant.query_workflow(session, message.content, code: code) + + try do + start_workflow_streaming_request(session, message.content, code) + {:ok, :streaming} + rescue + _ -> + AiAssistant.query_workflow(session, message.content, code: code) + end + end + + @doc false + @spec start_workflow_streaming_request( + AiAssistant.ChatSession.t(), + String.t(), + String.t() | nil + ) :: :ok + defp start_workflow_streaming_request(session, content, code) do + history = get_chat_history(session) + + payload = + %{ + "api_key" => Lightning.Config.apollo(:ai_assistant_api_key), + "content" => content, + "existing_yaml" => code, + "history" => history, + "meta" => session.meta || %{}, + "stream" => true + } + |> Enum.reject(fn {_, v} -> is_nil(v) end) + |> Enum.into(%{}) + + sse_payload = Map.put(payload, "lightning_session_id", session.id) + apollo_url = get_apollo_url("workflow_chat") + + case SSEStream.start_stream(apollo_url, sse_payload) do + {:ok, _pid} -> + Logger.debug( + "[MessageProcessor] Started Apollo SSE stream for workflow session #{session.id}" + ) + + {:error, reason} -> + Logger.error( + "[MessageProcessor] Failed to start Apollo workflow stream: #{inspect(reason)}" + ) + + Logger.debug("[MessageProcessor] Falling back to HTTP client") + raise "SSE stream failed, triggering fallback to HTTP" + end + + :ok end @doc false @@ -248,7 +385,7 @@ defmodule Lightning.AiAssistant.MessageProcessor do |> case do %ChatMessage{id: message_id, status: status} = message when status in [:pending, :processing] -> - Logger.info( + Logger.debug( "[AI Assistant] Updating message #{message_id} to error status after exception" ) @@ -325,7 +462,7 @@ defmodule Lightning.AiAssistant.MessageProcessor do |> case do %ChatMessage{id: message_id, status: status} = message when status in [:pending, :processing] -> - Logger.info( + Logger.debug( "[AI Assistant] Updating message #{message_id} to error status after stop=#{other}" ) diff --git a/lib/lightning/apollo_client/sse_stream.ex b/lib/lightning/apollo_client/sse_stream.ex new file mode 100644 index 0000000000..da2f06ec39 --- /dev/null +++ b/lib/lightning/apollo_client/sse_stream.ex @@ -0,0 +1,288 @@ +defmodule Lightning.ApolloClient.SSEStream do + @moduledoc """ + Server-Sent Events (SSE) client for streaming AI responses from Apollo server. + + This module handles HTTP streaming connections to Apollo's SSE endpoints, + parsing incoming events and forwarding them to the appropriate channels. + """ + use GenServer + + require Logger + + @doc """ + Starts a streaming SSE connection to Apollo server. + + ## Parameters + + - `url` - HTTP URL for Apollo streaming endpoint + - `payload` - Request payload to send to Apollo + + ## Returns + + - `{:ok, pid}` - SSE stream process started successfully + - `{:error, reason}` - Failed to establish connection + """ + def start_stream(url, payload) do + GenServer.start_link(__MODULE__, {url, payload}) + end + + @impl GenServer + def init({url, payload}) do + lightning_session_id = payload["lightning_session_id"] + apollo_payload = Map.delete(payload, "lightning_session_id") + + apollo_timeout = Lightning.Config.apollo(:timeout) || 30_000 + stream_timeout = apollo_timeout + 10_000 + + timeout_ref = Process.send_after(self(), :stream_timeout, stream_timeout) + + parent = self() + + spawn_link(fn -> + stream_request(url, apollo_payload, parent, lightning_session_id) + end) + + {:ok, + %{ + session_id: lightning_session_id, + timeout_ref: timeout_ref, + completed: false + }} + end + + @impl GenServer + def handle_info({:sse_event, event_type, data}, state) do + handle_sse_event(event_type, data, state) + {:noreply, state} + end + + def handle_info(:stream_timeout, %{completed: false} = state) do + Logger.error("[SSEStream] Stream timeout for session #{state.session_id}") + broadcast_error(state.session_id, "Request timed out. Please try again.") + {:stop, :timeout, state} + end + + def handle_info(:stream_timeout, state) do + {:noreply, state} + end + + def handle_info({:sse_complete}, state) do + if state.timeout_ref, do: Process.cancel_timer(state.timeout_ref) + Logger.info("[SSEStream] Stream completed for session #{state.session_id}") + {:stop, :normal, %{state | completed: true}} + end + + def handle_info({:sse_error, reason}, state) do + if state.timeout_ref, do: Process.cancel_timer(state.timeout_ref) + + Logger.error( + "[SSEStream] Stream error for session #{state.session_id}: #{inspect(reason)}" + ) + + error_message = + case reason do + :timeout -> "Connection timed out" + :closed -> "Connection closed unexpectedly" + {:shutdown, _} -> "Server shut down" + {:http_error, status} -> "Server returned error status #{status}" + _ -> "Connection error: #{inspect(reason)}" + end + + broadcast_error(state.session_id, error_message) + {:stop, :normal, %{state | completed: true}} + end + + defp stream_request(url, payload, parent, session_id) do + Logger.info("[SSEStream] Starting SSE connection to #{url}") + Logger.debug("[SSEStream] Payload: #{inspect(payload)}") + + headers = [ + {"Content-Type", "application/json"}, + {"Accept", "text/event-stream"}, + {"Authorization", + "Bearer #{Lightning.Config.apollo(:ai_assistant_api_key)}"} + ] + + case Finch.build(:post, url, headers, Jason.encode!(payload)) + |> Finch.stream(Lightning.Finch, %{}, fn + {:status, status}, acc -> + Logger.debug("[SSEStream] Response status: #{status}") + + if status >= 400 do + send(parent, {:sse_error, {:http_error, status}}) + end + + Map.put(acc, :status, status) + + {:headers, headers}, acc -> + Logger.debug("[SSEStream] Response headers: #{inspect(headers)}") + acc + + {:data, chunk}, acc -> + Logger.debug("[SSEStream] Raw chunk received: #{inspect(chunk)}") + + if Map.get(acc, :status, 200) in 200..299 do + parse_sse_chunk(chunk, parent, session_id) + end + + acc + end) do + {:ok, %{status: status}} when status in 200..299 -> + Logger.info("[SSEStream] Stream completed successfully") + send(parent, {:sse_complete}) + + {:ok, %{status: status}} -> + Logger.error("[SSEStream] Stream failed with status: #{status}") + send(parent, {:sse_error, {:http_error, status}}) + + {:error, reason, _acc} -> + Logger.error( + "[SSEStream] Stream failed before response: #{inspect(reason)}" + ) + + send(parent, {:sse_error, reason}) + end + end + + defp parse_sse_chunk(chunk, parent, _session_id) do + chunk + |> String.split("\n") + |> Enum.reduce(%{event: nil, data: nil}, fn line, acc -> + cond do + String.starts_with?(line, "event:") -> + event = line |> String.trim_leading("event:") |> String.trim() + %{acc | event: event} + + String.starts_with?(line, "data:") -> + data = line |> String.trim_leading("data:") |> String.trim() + %{acc | data: data} + + (line == "" and acc.event) && acc.data -> + send(parent, {:sse_event, acc.event, acc.data}) + %{event: nil, data: nil} + + true -> + acc + end + end) + end + + defp handle_sse_event(event_type, data, state) do + case event_type do + "content_block_delta" -> + handle_content_block_delta(data, state.session_id) + + "message_stop" -> + Logger.debug("[SSEStream] Received message_stop, broadcasting complete") + broadcast_complete(state.session_id) + + "complete" -> + handle_complete_event(data, state.session_id) + + "error" -> + handle_error_event(data, state.session_id) + + "log" -> + Logger.debug("[SSEStream] Apollo log: #{inspect(data)}") + + _ -> + Logger.debug("[SSEStream] Unhandled event type: #{event_type}") + :ok + end + end + + defp handle_content_block_delta(data, session_id) do + case Jason.decode(data) do + {:ok, %{"delta" => %{"type" => "text_delta", "text" => text}}} -> + Logger.debug("[SSEStream] Broadcasting chunk: #{inspect(text)}") + broadcast_chunk(session_id, text) + + {:ok, %{"delta" => %{"type" => "thinking_delta", "thinking" => thinking}}} -> + Logger.debug("[SSEStream] Broadcasting status: #{inspect(thinking)}") + broadcast_status(session_id, thinking) + + _ -> + :ok + end + end + + defp handle_complete_event(data, session_id) do + Logger.debug("[SSEStream] Received complete event with payload") + + case Jason.decode(data) do + {:ok, payload} -> + Logger.debug( + "[SSEStream] Broadcasting complete payload: #{inspect(Map.keys(payload))}" + ) + + broadcast_payload_complete(session_id, payload) + + {:error, error} -> + Logger.error( + "[SSEStream] Failed to parse complete event payload: #{inspect(error)}" + ) + end + + :ok + end + + defp handle_error_event(data, session_id) do + Logger.error("[SSEStream] Received error event: #{inspect(data)}") + + error_message = + case Jason.decode(data) do + {:ok, %{"message" => msg}} -> msg + {:ok, %{"error" => err}} -> err + _ -> "An error occurred while streaming" + end + + broadcast_error(session_id, error_message) + end + + defp broadcast_chunk(session_id, data) do + Lightning.broadcast( + "ai_session:#{session_id}", + {:ai_assistant, :streaming_chunk, %{content: data, session_id: session_id}} + ) + end + + defp broadcast_status(session_id, data) do + Lightning.broadcast( + "ai_session:#{session_id}", + {:ai_assistant, :status_update, %{status: data, session_id: session_id}} + ) + end + + defp broadcast_complete(session_id) do + Lightning.broadcast( + "ai_session:#{session_id}", + {:ai_assistant, :streaming_complete, %{session_id: session_id}} + ) + end + + defp broadcast_payload_complete(session_id, payload) do + payload_data = %{ + session_id: session_id, + usage: Map.get(payload, "usage"), + meta: Map.get(payload, "meta"), + code: Map.get(payload, "response_yaml") + } + + Lightning.broadcast( + "ai_session:#{session_id}", + {:ai_assistant, :streaming_payload_complete, payload_data} + ) + end + + defp broadcast_error(session_id, error_message) do + payload_data = %{ + session_id: session_id, + error: error_message + } + + Lightning.broadcast( + "ai_session:#{session_id}", + {:ai_assistant, :streaming_error, payload_data} + ) + end +end diff --git a/lib/lightning_web/live/ai_assistant/component.ex b/lib/lightning_web/live/ai_assistant/component.ex index 1b8e5dfcac..b27a4563e8 100644 --- a/lib/lightning_web/live/ai_assistant/component.ex +++ b/lib/lightning_web/live/ai_assistant/component.ex @@ -43,7 +43,10 @@ defmodule LightningWeb.AiAssistant.Component do callbacks: %{}, selected_message: nil, registered_session_id: nil, - registered_component_id: nil + registered_component_id: nil, + streaming_content: "", + streaming_status: nil, + streaming_error: nil }) |> assign_async(:endpoint_available, fn -> {:ok, %{endpoint_available: AiAssistant.endpoint_available?()}} @@ -55,6 +58,26 @@ defmodule LightningWeb.AiAssistant.Component do {:ok, handle_message_status(status, socket)} end + def update(%{id: _id, streaming_chunk: chunk_data}, socket) do + {:ok, handle_streaming_chunk(chunk_data, socket)} + end + + def update(%{id: _id, status_update: status_data}, socket) do + {:ok, handle_status_update(status_data, socket)} + end + + def update(%{id: _id, streaming_complete: _}, socket) do + {:ok, handle_streaming_complete(socket)} + end + + def update(%{id: _id, streaming_payload_complete: payload_data}, socket) do + {:ok, handle_streaming_payload_complete(payload_data, socket)} + end + + def update(%{id: _id, streaming_error: error_data}, socket) do + {:ok, handle_streaming_error(error_data, socket)} + end + def update(%{action: :code_error} = assigns, socket) do {:ok, handle_code_error(socket, assigns)} end @@ -167,7 +190,9 @@ defmodule LightningWeb.AiAssistant.Component do session: session, pending_message: AsyncResult.ok(nil), selected_message: nil, - code_error: nil + code_error: nil, + streaming_content: Map.get(socket.assigns, :streaming_content, ""), + streaming_status: Map.get(socket.assigns, :streaming_status, nil) ) |> delegate_to_handler(:on_message_received, [session]) end @@ -175,7 +200,158 @@ defmodule LightningWeb.AiAssistant.Component do defp handle_message_status({:error, session}, socket) do assign(socket, session: session, - pending_message: AsyncResult.ok(nil) + pending_message: AsyncResult.ok(nil), + streaming_content: Map.get(socket.assigns, :streaming_content, ""), + streaming_status: Map.get(socket.assigns, :streaming_status, nil) + ) + end + + defp handle_streaming_chunk(chunk_data, socket) do + current_content = socket.assigns.streaming_content + new_content = current_content <> chunk_data.content + assign(socket, streaming_content: new_content) + end + + defp handle_status_update(status_data, socket) do + assign(socket, streaming_status: status_data.status) + end + + defp handle_streaming_complete(socket) do + # Keep streamed content and status in memory until payload arrives + # This allows saving content + code together in one operation + socket + end + + defp handle_streaming_payload_complete(payload_data, socket) do + require Logger + session = socket.assigns.session + content = socket.assigns.streaming_content + + Logger.debug( + "[Component] Processing complete payload for session #{session.id}" + ) + + # Save the assistant message with ALL data at once (content + code + usage + meta) + # This matches the non-streaming approach + message_attrs = %{ + role: :assistant, + content: content, + status: :success + } + + opts = [ + usage: payload_data[:usage] || %{}, + meta: payload_data[:meta], + code: payload_data[:code] + ] + + case AiAssistant.save_message(session, message_attrs, opts) do + {:ok, _updated_session} -> + Logger.debug( + "[Component] Successfully saved complete message with payload data" + ) + + # Mark all pending/processing user messages as success + # Need to reload first to get current state + {:ok, fresh_session} = AiAssistant.get_session(session.id) + + pending_user_messages = + AiAssistant.find_pending_user_messages(fresh_session) + + Logger.debug( + "[Component] Found #{length(pending_user_messages)} pending user messages to mark as success" + ) + + results = + Enum.map(pending_user_messages, fn user_message -> + Logger.debug( + "[Component] Updating user message #{user_message.id} from #{user_message.status} to :success" + ) + + AiAssistant.update_message_status( + fresh_session, + user_message, + :success + ) + end) + + Logger.debug("[Component] Update results: #{inspect(results)}") + + # Reload session again to get fresh user message statuses after updates + {:ok, final_session} = AiAssistant.get_session(session.id) + + Logger.debug( + "[Component] Final user message statuses: #{inspect(Enum.filter(final_session.messages, &(&1.role == :user)) |> Enum.map(&{&1.id, &1.status}))}" + ) + + # Clear loading state and streaming content + Logger.debug("[Component] Clearing pending_message and streaming state") + + socket = + socket + |> assign( + session: final_session, + pending_message: AsyncResult.ok(nil), + streaming_content: "", + streaming_status: nil + ) + + # Always call callback to notify message received (sets sending_ai_message: false) + case socket.assigns.callbacks[:on_message_received] do + callback when is_function(callback, 2) -> + code = payload_data[:code] + last_message = final_session.messages |> List.last() + callback.(code, last_message) + socket + + _ -> + socket + end + + {:error, error} -> + Logger.error( + "[Component] Failed to save complete message: #{inspect(error)}" + ) + + socket + |> assign( + pending_message: AsyncResult.ok(nil), + streaming_content: "", + streaming_status: nil + ) + end + end + + defp handle_streaming_error(error_data, socket) do + require Logger + session = socket.assigns.session + + Logger.error( + "[Component] Streaming error for session #{session.id}: #{error_data.error}" + ) + + # Find the user message that was being processed + user_messages = + Enum.filter( + session.messages, + &(&1.role == :user && &1.status == :processing) + ) + + # Mark user messages as failed + Enum.each(user_messages, fn msg -> + AiAssistant.update_message_status(session, msg, :error) + end) + + # Reload session with updated statuses + {:ok, updated_session} = AiAssistant.get_session(session.id) + + socket + |> assign( + session: updated_session, + pending_message: AsyncResult.loading(), + streaming_content: "", + streaming_status: nil, + streaming_error: error_data.error ) end @@ -201,6 +377,9 @@ defmodule LightningWeb.AiAssistant.Component do |> assign_new(:changeset, fn _ -> handler.validate_form(%{"content" => nil}) end) + |> assign_new(:streaming_content, fn -> "" end) + |> assign_new(:streaming_status, fn -> nil end) + |> assign_new(:streaming_error, fn -> nil end) end defp extract_message_id(%ChatSession{messages: messages}) do @@ -341,6 +520,47 @@ defmodule LightningWeb.AiAssistant.Component do end end + def handle_event("retry_streaming", _params, socket) do + # Re-submit the last user message + session = socket.assigns.session + + last_user_msg = + Enum.reverse(session.messages) + |> Enum.find(&(&1.role == :user)) + + if last_user_msg do + # Reset error state + socket = assign(socket, streaming_error: nil) + + # Resubmit message + case AiAssistant.retry_message(last_user_msg) do + {:ok, {_message, _oban_job}} -> + {:ok, updated_session} = AiAssistant.get_session(session.id) + + {:noreply, + assign(socket, + session: updated_session, + pending_message: AsyncResult.loading() + )} + + {:error, _} -> + {:noreply, put_flash(socket, :error, "Failed to retry request")} + end + else + {:noreply, socket} + end + end + + def handle_event("cancel_streaming", _params, socket) do + # Just clear the error state + socket = + socket + |> assign(streaming_error: nil, pending_message: AsyncResult.ok(nil)) + |> put_flash(:info, "Request cancelled") + + {:noreply, socket} + end + def handle_event( "select_assistant_message", %{"message-id" => message_id}, @@ -422,14 +642,23 @@ defmodule LightningWeb.AiAssistant.Component do end defp save_message(socket, action, content) do + require Logger + + Logger.debug( + "[AI Component] save_message called with action: #{inspect(action)}" + ) + result = case action do :new -> create_new_session(socket, content) :show -> add_to_existing_session(socket, content) end + Logger.debug("[AI Component] save_message result: #{inspect(result)}") + case result do {:ok, session} -> + Logger.debug("[AI Component] Calling handle_successful_save") handle_successful_save(socket, session, action) {:error, error} -> @@ -446,6 +675,11 @@ defmodule LightningWeb.AiAssistant.Component do end defp handle_successful_save(socket, session, :new) do + # Parent LiveView handles PubSub subscription via component registration + # Component receives updates via send_update from parent + require Logger + Logger.debug("[AI Component] New session created: #{session.id}") + socket |> assign(:session, session) |> assign(:pending_message, AsyncResult.loading()) @@ -453,6 +687,11 @@ defmodule LightningWeb.AiAssistant.Component do end defp handle_successful_save(socket, session, :show) do + # Parent LiveView handles PubSub subscription via component registration + # Component receives updates via send_update from parent + require Logger + Logger.debug("[AI Component] Message added to session: #{session.id}") + socket |> assign(:session, session) |> assign(:pending_message, AsyncResult.loading()) @@ -583,6 +822,9 @@ defmodule LightningWeb.AiAssistant.Component do handler={@handler} code_error={@code_error} mode={@mode} + streaming_status={@streaming_status} + streaming_content={@streaming_content} + streaming_error={@streaming_error} /> <% end %> @@ -1121,6 +1363,9 @@ defmodule LightningWeb.AiAssistant.Component do attr :handler, :any, required: true attr :code_error, :any, required: true attr :mode, :atom, required: true + attr :streaming_status, :string, default: nil + attr :streaming_content, :string, default: "" + attr :streaming_error, :string, default: nil defp render_individual_session(assigns) do assigns = assign(assigns, ai_feedback: ai_feedback()) @@ -1187,7 +1432,15 @@ defmodule LightningWeb.AiAssistant.Component do <.async_result assign={@pending_message}> <:loading> - <.assistant_typing_indicator handler={@handler} /> + <%= if @streaming_error do %> + <.streaming_error_state error={@streaming_error} target={@target} /> + <% else %> + <.assistant_typing_indicator + handler={@handler} + streaming_status={@streaming_status} + streaming_content={@streaming_content} + /> + <% end %> <:failed :let={failure}> @@ -1405,6 +1658,49 @@ defmodule LightningWeb.AiAssistant.Component do """ end + attr :error, :string, required: true + attr :target, :any, required: true + + defp streaming_error_state(assigns) do + ~H""" +{@error}
+ +Processing...
++ {@streaming_status || "Processing..."} +
+