From 516b0947a981398dba532ee76c41f507b6e797c7 Mon Sep 17 00:00:00 2001 From: Usama Date: Tue, 31 Mar 2026 15:32:47 +0000 Subject: [PATCH 1/3] Refactor audio processing in `audio_utils.py` to use temporary directories for file handling. This change improves the handling of variable bitrate (VBR) headers and ensures accurate duration metadata during audio conversion. Additionally, updated merging logic for audio files to utilize temporary files for chunk processing, enhancing overall reliability and performance. --- echo/server/dembrane/audio_utils.py | 142 ++++++++++++++++------------ 1 file changed, 83 insertions(+), 59 deletions(-) diff --git a/echo/server/dembrane/audio_utils.py b/echo/server/dembrane/audio_utils.py index 5b7e17fb3..6765cd03d 100644 --- a/echo/server/dembrane/audio_utils.py +++ b/echo/server/dembrane/audio_utils.py @@ -176,17 +176,23 @@ def convert_and_save_to_s3( if b"ftypM4A" in input_data[:50] or b"moov" in input_data[:200]: logger.info("Detected possible Apple Voice Memo signature") - # Process through ffmpeg - with tempfile.NamedTemporaryFile(suffix=f".{file_format}") as input_temp_file: - input_temp_file.write(input_data) - input_temp_file.flush() + # Write to a temp file (not pipe) so ffmpeg can seek back to write + # proper VBR headers (e.g. Xing for MP3), which are required for + # accurate duration metadata. + with tempfile.TemporaryDirectory() as tmpdir: + input_path = os.path.join(tmpdir, f"input.{file_format}") + output_path = os.path.join(tmpdir, f"output.{output_format}") + + with open(input_path, "wb") as f: + f.write(input_data) + if output_format == "ogg": if file_format.lower() in ["m4a", "mp4"]: logger.debug("Special handling for M4A files") process = ( - ffmpeg.input(input_temp_file.name, f=file_format) + ffmpeg.input(input_path, f=file_format) .output( - "pipe:1", + output_path, f="ogg", acodec="libvorbis", q="5", @@ -201,26 +207,25 @@ def convert_and_save_to_s3( "ignore_err", ) .overwrite_output() - .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) + .run_async(pipe_stdout=True, pipe_stderr=True) ) else: process = ( - ffmpeg.input(input_temp_file.name, f=file_format) - .output("pipe:1", f="ogg", acodec="libvorbis", q="5") + ffmpeg.input(input_path, f=file_format) + .output(output_path, f="ogg", acodec="libvorbis", q="5") .global_args("-hide_banner", "-loglevel", "warning") .overwrite_output() - .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) + .run_async(pipe_stdout=True, pipe_stderr=True) ) elif output_format == "mp3": process = ( - ffmpeg.input(input_temp_file.name, f=file_format) + ffmpeg.input(input_path, f=file_format) .output( - "pipe:1", + output_path, f="mp3", acodec="libmp3lame", q="5", strict="-2", - preset="veryfast", ) .global_args( "-hide_banner", @@ -230,33 +235,38 @@ def convert_and_save_to_s3( "ignore_err", ) .overwrite_output() - .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) + .run_async(pipe_stdout=True, pipe_stderr=True) ) else: raise ValueError(f"Not implemented for file format: {output_format}") - output, err = process.communicate(input=None) - - # Log the stderr output for debugging - err_text = err.decode() if err else "" - if err_text: - logger.debug(f"FFmpeg stderr: {err_text}") - - if process.returncode != 0: - error_message = err_text or "Unknown FFmpeg error" - if "No such file or directory" in error_message: - raise FFmpegError(f"Input file not found: {input_file_name}") - elif "Invalid data found when processing input" in error_message: - raise FFmpegError("Invalid or corrupted input file") - elif "Memory allocation error" in error_message: - raise FFmpegError( - f"Memory allocation failed - file too large. " - f"Required memory: {estimated_memory_mb:.1f}MB" - ) - else: - raise FFmpegError(f"FFmpeg processing failed: {error_message}") + _, err = process.communicate() + + # Log the stderr output for debugging + err_text = err.decode() if err else "" + if err_text: + logger.debug(f"FFmpeg stderr: {err_text}") + + if process.returncode != 0: + error_message = err_text or "Unknown FFmpeg error" + if "No such file or directory" in error_message: + raise FFmpegError(f"Input file not found: {input_file_name}") + elif "Invalid data found when processing input" in error_message: + raise FFmpegError("Invalid or corrupted input file") + elif "Memory allocation error" in error_message: + raise FFmpegError( + f"Memory allocation failed - file too large. " + f"Required memory: {estimated_memory_mb:.1f}MB" + ) + else: + raise FFmpegError(f"FFmpeg processing failed: {error_message}") + + if not os.path.exists(output_path): + raise ConversionError("FFmpeg produced no output file") + + with open(output_path, "rb") as f: + output = f.read() - # Verify we got valid output if not output: raise ConversionError("FFmpeg produced empty output") @@ -361,43 +371,53 @@ def merge_multiple_audio_files_and_save_to_s3( if not processed_data_streams: raise ValueError("No processed data streams") - with tempfile.NamedTemporaryFile(suffix=f".{output_format}") as temp_file: - for data_stream in processed_data_streams: - temp_file.write(data_stream.read()) + with tempfile.TemporaryDirectory() as tmpdir: + chunk_paths = [] + for i, data_stream in enumerate(processed_data_streams): + chunk_path = os.path.join(tmpdir, f"chunk_{i}.{output_format}") + with open(chunk_path, "wb") as f: + f.write(data_stream.read()) + chunk_paths.append(chunk_path) + + concat_list_path = os.path.join(tmpdir, "concat_list.txt") + with open(concat_list_path, "w") as f: + for p in chunk_paths: + f.write(f"file '{p}'\n") - temp_file.flush() + merged_path = os.path.join(tmpdir, f"merged.{output_format}") if output_format == "ogg": - # Final processing to ensure consistent output process = ( - ffmpeg.input(temp_file.name, format=output_format) - .output("pipe:1", f="ogg", acodec="libvorbis", q="5") + ffmpeg.input(concat_list_path, f="concat", safe=0) + .output(merged_path, f="ogg", acodec="libvorbis", q="5") .global_args("-hide_banner", "-loglevel", "warning") .overwrite_output() - .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) + .run_async(pipe_stdout=True, pipe_stderr=True) ) elif output_format == "mp3": process = ( - ffmpeg.input(temp_file.name, format=output_format) + ffmpeg.input(concat_list_path, f="concat", safe=0) .output( - "pipe:1", + merged_path, f="mp3", acodec="libmp3lame", q="5", - preset="veryfast", ) .global_args("-hide_banner", "-loglevel", "warning") .overwrite_output() - .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) + .run_async(pipe_stdout=True, pipe_stderr=True) ) else: raise ValueError(f"Not implemented for file format: {output_format}") - output, err = process.communicate(input=None) + _, err = process.communicate() - if process.returncode != 0: - error_message = err.decode() if err else "Unknown FFmpeg error" - raise FFmpegError(f"FFmpeg final processing failed: {error_message}") + if process.returncode != 0: + error_message = err.decode() if err else "Unknown FFmpeg error" + raise FFmpegError(f"FFmpeg final processing failed: {error_message}") + + with open(merged_path, "rb") as f: + output = f.read() # Save to S3 logger.info(f"Saving merged audio to S3 as {output_file_name}") @@ -655,9 +675,10 @@ def split_audio_chunk( s3_keys_created = [] # Track S3 keys for cleanup on failure try: - with tempfile.NamedTemporaryFile(suffix=f".{output_format}") as temp_file: - temp_file.write(get_stream_from_s3(updated_chunk_path).read()) - temp_file.flush() + with tempfile.TemporaryDirectory() as tmpdir: + source_path = os.path.join(tmpdir, f"source.{output_format}") + with open(source_path, "wb") as f: + f.write(get_stream_from_s3(updated_chunk_path).read()) # Phase 1: Split and upload all chunks to S3 for i in range(number_chunks): @@ -670,24 +691,27 @@ def split_audio_chunk( ) logger.debug(f"Extracting chunk {i + 1}/{number_chunks} starting at {start_time}s") + split_out_path = os.path.join(tmpdir, f"split_{i}.{output_format}") process = ( - ffmpeg.input(temp_file.name) + ffmpeg.input(source_path) .output( - "pipe:1", + split_out_path, ss=start_time, t=chunk_duration, f=output_format, - preset="veryfast", ) .overwrite_output() - .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) + .run_async(pipe_stdout=True, pipe_stderr=True) ) - chunk_output, err = process.communicate(input=None) + _, err = process.communicate() if process.returncode != 0: raise FFmpegError(f"ffmpeg splitting failed: {err.decode().strip()}") + with open(split_out_path, "rb") as f: + chunk_output = f.read() + s3_client.put_object( Bucket=STORAGE_S3_BUCKET, Key=s3_chunk_path, From 32b699641687b03379465d3668b6343bcac42509 Mon Sep 17 00:00:00 2001 From: Usama Date: Thu, 16 Apr 2026 11:23:20 +0000 Subject: [PATCH 2/3] fix: extract duration from local temp file instead of re-downloading from S3 --- echo/server/dembrane/api/conversation.py | 28 ++++---- echo/server/dembrane/audio_utils.py | 76 +++++++++++++++----- echo/server/tests/test_audio_utils.py | 90 ++++++++++++------------ 3 files changed, 119 insertions(+), 75 deletions(-) diff --git a/echo/server/dembrane/api/conversation.py b/echo/server/dembrane/api/conversation.py index 171804fcd..def7f0d5c 100644 --- a/echo/server/dembrane/api/conversation.py +++ b/echo/server/dembrane/api/conversation.py @@ -16,7 +16,6 @@ from dembrane.service import project_service, conversation_service, build_conversation_service from dembrane.directus import directus from dembrane.audio_utils import ( - get_duration_from_s3, sanitize_filename_component, merge_multiple_audio_files_and_save_to_s3, ) @@ -291,20 +290,14 @@ async def get_conversation_content( try: uuid = generate_uuid() - merged_path = await run_in_thread_pool( + merged_path, duration = await run_in_thread_pool( merge_multiple_audio_files_and_save_to_s3, file_paths, f"audio-conversations/merged-{sanitize_filename_component(conversation_id)}-{uuid}.mp3", "mp3", ) - logger.debug(f"Successfully merged audio to: {merged_path}") - - duration = -1.0 - try: - duration = await run_in_thread_pool(get_duration_from_s3, merged_path) - except Exception as e: - logger.error(f"Error getting duration from s3: {str(e)}") + logger.debug(f"Successfully merged audio to: {merged_path}, duration: {duration}s") await run_in_thread_pool( active_client.update_item, @@ -793,11 +786,18 @@ async def retranscribe_conversation( # because return_url is True assert isinstance(merged_audio_path, str) - duration = None - try: - duration = await run_in_thread_pool(get_duration_from_s3, merged_audio_path) - except Exception as e: - logger.error(f"Error getting duration from s3: {str(e)}") + # Duration was already computed and saved by get_conversation_content above + updated_conversation = await run_in_thread_pool( + active_client.get_items, + "conversation", + { + "query": { + "filter": {"id": {"_eq": conversation_id}}, + "fields": ["duration"], + } + }, + ) + duration = updated_conversation[0].get("duration") if updated_conversation else None # Create a new conversation new_conversation_id = generate_uuid() diff --git a/echo/server/dembrane/audio_utils.py b/echo/server/dembrane/audio_utils.py index 6765cd03d..e590aca87 100644 --- a/echo/server/dembrane/audio_utils.py +++ b/echo/server/dembrane/audio_utils.py @@ -306,7 +306,7 @@ def merge_multiple_audio_files_and_save_to_s3( input_file_names: List[str], output_file_name: str, output_format: str, -) -> str: +) -> tuple[str, float]: """Merge multiple audio files and save the result back to S3. Args: @@ -315,7 +315,7 @@ def merge_multiple_audio_files_and_save_to_s3( output_format: Format to convert to Returns: - str: Public URL of the processed file + tuple of (public_url, duration_seconds). Duration is -1.0 if probing fails. Raises: FFmpegError: For FFmpeg-specific errors @@ -416,32 +416,41 @@ def merge_multiple_audio_files_and_save_to_s3( error_message = err.decode() if err else "Unknown FFmpeg error" raise FFmpegError(f"FFmpeg final processing failed: {error_message}") - with open(merged_path, "rb") as f: - output = f.read() - - # Save to S3 - logger.info(f"Saving merged audio to S3 as {output_file_name}") - s3_client.put_object( - Bucket=STORAGE_S3_BUCKET, - Key=get_sanitized_s3_key(output_file_name), - Body=output, - ACL="private", - ) + # Probe duration from the local temp file before cleanup (no S3 round-trip) + audio_duration = -1.0 + try: + probe_data = probe_from_file(merged_path) + if "format" in probe_data and "duration" in probe_data["format"]: + audio_duration = float(probe_data["format"]["duration"]) + else: + logger.error("Duration not found in ffprobe output for merged file") + except Exception as e: + logger.error(f"Error probing duration from local merged file: {str(e)}") + + # Stream-upload to S3 from disk (never loads full file into memory) + s3_key = get_sanitized_s3_key(output_file_name) + logger.info(f"Saving merged audio to S3 as {output_file_name}") + s3_client.upload_file( + merged_path, + STORAGE_S3_BUCKET, + s3_key, + ExtraArgs={"ACL": "private"}, + ) info = s3_client.head_object( Bucket=STORAGE_S3_BUCKET, Key=get_sanitized_s3_key(output_file_name) ) logger.debug(f"Head object from S3: {info}") - duration = time.time() - start_time + elapsed = time.time() - start_time logger.info( - f"Completed merging {len(input_file_names)} files in {duration:.2f}s. " - f"Total input size: {total_size_mb:.1f}MB" + f"Completed merging {len(input_file_names)} files in {elapsed:.2f}s. " + f"Total input size: {total_size_mb:.1f}MB, duration: {audio_duration:.1f}s" ) public_url = f"{STORAGE_S3_ENDPOINT}/{STORAGE_S3_BUCKET}/{output_file_name}" - return public_url + return public_url, audio_duration def probe_from_bytes(file_bytes: bytes, input_format: str) -> dict: @@ -579,6 +588,39 @@ def probe_from_bytes(file_bytes: bytes, input_format: str) -> dict: logger.warning(f"Failed to delete temporary file {temp_file_path}: {e}") +def probe_from_file(file_path: str) -> dict: + """Run ffprobe directly on a local file path. Avoids loading the file into memory.""" + if not os.path.exists(file_path): + raise ValueError(f"File not found: {file_path}") + + cmd = [ + "ffprobe", + "-hide_banner", + "-loglevel", + "warning", + "-print_format", + "json", + "-show_format", + "-show_streams", + file_path, + ] + + process = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + stderr_output = process.stderr.decode().strip() + if stderr_output: + logger.debug(f"ffprobe stderr: {stderr_output}") + + if process.returncode != 0: + raise ValueError(f"ffprobe failed on {file_path}: {stderr_output or 'Unknown error'}") + + output = process.stdout.decode() + if not output: + raise ValueError("ffprobe returned empty output") + + return json.loads(output) + + def probe_from_s3(file_name: str, input_format: str) -> dict: return probe_from_bytes(get_stream_from_s3(file_name).read(), input_format) diff --git a/echo/server/tests/test_audio_utils.py b/echo/server/tests/test_audio_utils.py index 9989cdd82..2875fda51 100644 --- a/echo/server/tests/test_audio_utils.py +++ b/echo/server/tests/test_audio_utils.py @@ -67,13 +67,13 @@ def test_convert_and_save_to_s3(file_name, output_format): output_url = convert_and_save_to_s3(input_file_key, output_file_key, output_format) assert output_url is not None, "output_url is None" - assert output_url.startswith( - STORAGE_S3_ENDPOINT - ), "output_url does not start with STORAGE_S3_ENDPOINT" + assert output_url.startswith(STORAGE_S3_ENDPOINT), ( + "output_url does not start with STORAGE_S3_ENDPOINT" + ) assert output_url.startswith("http"), "output_url does not start with http" - assert output_url.endswith( - f".{output_format}" - ), f"output_url does not end with .{output_format}" + assert output_url.endswith(f".{output_format}"), ( + f"output_url does not end with .{output_format}" + ) # Verify the file exists and has content head_response = s3_client.head_object( @@ -135,17 +135,18 @@ def test_merge_multiple_audio_files_and_save_to_s3(output_format): # merge files merged_file_key = "tests/" + generate_uuid() + "." + output_format - merged_file_url = merge_multiple_audio_files_and_save_to_s3( + merged_file_url, duration = merge_multiple_audio_files_and_save_to_s3( new_file_names, merged_file_key, output_format ) assert merged_file_url is not None, "merged_file_url is None" - assert merged_file_url.startswith( - STORAGE_S3_ENDPOINT - ), "merged_file_url does not start with STORAGE_S3_ENDPOINT" - assert merged_file_url.endswith( - f".{output_format}" - ), f"merged_file_url does not end with .{output_format}" + assert merged_file_url.startswith(STORAGE_S3_ENDPOINT), ( + "merged_file_url does not start with STORAGE_S3_ENDPOINT" + ) + assert merged_file_url.endswith(f".{output_format}"), ( + f"merged_file_url does not end with .{output_format}" + ) + assert duration > 0, f"Duration should be positive, got {duration}" response = s3_client.get_object( Bucket=STORAGE_S3_BUCKET, Key=get_sanitized_s3_key(merged_file_key) @@ -245,12 +246,12 @@ def test_split_audio_chunk(file_name, output_format): split_chunk, ) assert item is not None, f"Failed to get split chunk {item['id']}" - assert item["path"].startswith( - "http" - ), f"Split chunk {item['path']} does not start with http" - assert item["path"].startswith( - STORAGE_S3_ENDPOINT - ), f"Split chunk {item['path']} does not start with STORAGE_S3_ENDPOINT" + assert item["path"].startswith("http"), ( + f"Split chunk {item['path']} does not start with http" + ) + assert item["path"].startswith(STORAGE_S3_ENDPOINT), ( + f"Split chunk {item['path']} does not start with STORAGE_S3_ENDPOINT" + ) data = s3_client.get_object( Bucket=STORAGE_S3_BUCKET, @@ -263,12 +264,12 @@ def test_split_audio_chunk(file_name, output_format): assert probe is not None, f"Failed to probe split chunk {item['path']}" assert probe["streams"] is not None, f"Probe result for {item['path']} has no streams" assert len(probe["streams"]) > 0, f"Probe result for {item['path']} has no streams" - assert ( - probe["streams"][0]["codec_type"] == "audio" - ), f"Probe result for {item['path']} is not an audio stream" - assert ( - float(probe["streams"][0]["duration"]) > 0 - ), f"Probe result for {item['path']} has no duration" + assert probe["streams"][0]["codec_type"] == "audio", ( + f"Probe result for {item['path']} is not an audio stream" + ) + assert float(probe["streams"][0]["duration"]) > 0, ( + f"Probe result for {item['path']} has no duration" + ) # delete the conversation chunk directus.delete_item("conversation_chunk", chunk_id) @@ -326,14 +327,14 @@ def test_probe_from_bytes(file_name: str): assert probe_result is not None, f"Failed to probe {file_name}" assert "streams" in probe_result, f"No streams in probe result for {file_name}" assert len(probe_result["streams"]) > 0, f"No streams in probe result for {file_name}" - assert ( - probe_result["streams"][0]["codec_type"] == "audio" - ), f"Not an audio stream for {file_name}" + assert probe_result["streams"][0]["codec_type"] == "audio", ( + f"Not an audio stream for {file_name}" + ) assert "format" in probe_result, f"No format information in probe result for {file_name}" assert "duration" in probe_result["format"], f"No duration in probe result for {file_name}" - assert ( - float(probe_result["format"]["duration"]) > 0 - ), f"Duration not positive for {file_name}" + assert float(probe_result["format"]["duration"]) > 0, ( + f"Duration not positive for {file_name}" + ) logger.info(f"Successfully probed {file_name}") @@ -367,14 +368,14 @@ def test_probe_from_s3(file_name: str): assert probe_result is not None, f"Failed to probe {s3_key}" assert "streams" in probe_result, f"No streams in probe result for {s3_key}" assert len(probe_result["streams"]) > 0, f"No streams in probe result for {s3_key}" - assert ( - probe_result["streams"][0]["codec_type"] == "audio" - ), f"Not an audio stream for {s3_key}" + assert probe_result["streams"][0]["codec_type"] == "audio", ( + f"Not an audio stream for {s3_key}" + ) assert "format" in probe_result, f"No format information in probe result for {s3_key}" assert "duration" in probe_result["format"], f"No duration in probe result for {s3_key}" - assert ( - float(probe_result["format"]["duration"]) > 0 - ), f"Duration not positive for {s3_key}" + assert float(probe_result["format"]["duration"]) > 0, ( + f"Duration not positive for {s3_key}" + ) logger.info(f"Successfully probed {s3_key} from S3") @@ -452,17 +453,18 @@ def test_merge_specific_format_pairs(file_formats, output_format): merged_file_key = ( f"tests/{generate_uuid()}-{format1}_{format2}_to_{output_format}.{output_format}" ) - merged_file_url = merge_multiple_audio_files_and_save_to_s3( + merged_file_url, duration = merge_multiple_audio_files_and_save_to_s3( new_file_names, merged_file_key, output_format ) assert merged_file_url is not None, "merged_file_url is None" - assert merged_file_url.startswith( - STORAGE_S3_ENDPOINT - ), "merged_file_url does not start with STORAGE_S3_ENDPOINT" - assert merged_file_url.endswith( - f".{output_format}" - ), f"merged_file_url does not end with .{output_format}" + assert merged_file_url.startswith(STORAGE_S3_ENDPOINT), ( + "merged_file_url does not start with STORAGE_S3_ENDPOINT" + ) + assert merged_file_url.endswith(f".{output_format}"), ( + f"merged_file_url does not end with .{output_format}" + ) + assert duration > 0, f"Duration should be positive, got {duration}" # Verify the merged file response = s3_client.get_object( From 5ce013984facffc549fdf5f1ff02e2062842bc32 Mon Sep 17 00:00:00 2001 From: Usama Date: Thu, 16 Apr 2026 12:54:31 +0000 Subject: [PATCH 3/3] fix: finish all conversations on multi-file dashboard upload --- echo/frontend/src/lib/api.ts | 53 ++++++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 21 deletions(-) diff --git a/echo/frontend/src/lib/api.ts b/echo/frontend/src/lib/api.ts index 4f6a31f87..9f1789339 100644 --- a/echo/frontend/src/lib/api.ts +++ b/echo/frontend/src/lib/api.ts @@ -708,10 +708,12 @@ export const initiateAndUploadConversationChunk = async (payload: { const fileQueue = [...Array(payload.chunks.length).keys()]; const inProgress = new Set(); - // Track conversation ID for finish hook - let conversationId: string | null = null; + // Each uploaded file creates its own conversation. + // We need to remember every conversation ID so we can call finishConversation() + // on each one after uploads complete. Without this, only the first conversation + // would enter the processing pipeline (transcription, merging, duration, etc.). + const conversationIdByFileIndex = new Map(); - // Process a single file const processFile = async (i: number) => { const chunk = payload.chunks[i]; let fileName = ""; @@ -726,7 +728,6 @@ export const initiateAndUploadConversationChunk = async (payload: { const source = payload.source || "PORTAL_AUDIO"; try { - // Create the conversation first (one per file in this implementation) const conversation = await initiateConversation({ email: payload.email, name: `${payload.namePrefix} - ${fileName}`, @@ -736,10 +737,7 @@ export const initiateAndUploadConversationChunk = async (payload: { tagIdList: payload.tagIdList, }); - // Store conversation ID for finish hook - if (!conversationId) { - conversationId = conversation.id; - } + conversationIdByFileIndex.set(i, conversation.id); // Upload using new presigned URL method const uploadResult = await uploadConversationChunkWithPresignedUrl({ @@ -811,20 +809,33 @@ export const initiateAndUploadConversationChunk = async (payload: { toast.success(`All ${payload.chunks.length} file(s) uploaded successfully`); } - // IMPORTANT: Trigger finish hook after all uploads complete - // This triggers: audio merging, ETL pipeline, summarization - if (conversationId && failures.length === 0) { - console.log( - `[Upload] Triggering finish hook for conversation ${conversationId}`, - ); - try { - await finishConversation(conversationId); - console.log("[Upload] Finish hook triggered successfully"); - } catch (error) { - console.error("[Upload] Failed to trigger finish hook:", error); - // Don't throw - uploads succeeded, this is just post-processing + // Collect conversation IDs for files that uploaded successfully. + // Failed uploads are skipped — they have no conversation to finish. + const succeededConversationIds = results.reduce((ids, result, i) => { + const isFailure = result && "error" in result; + const conversationId = conversationIdByFileIndex.get(i); + if (!isFailure && conversationId) { + ids.push(conversationId); } - } + return ids; + }, []); + + // Call finishConversation() for every successful upload, concurrently. + // This triggers the backend pipeline: transcription → merging → duration → summary. + // Each call is independent, so one failure won't block the others. + await Promise.all( + succeededConversationIds.map(async (conversationId) => { + try { + await finishConversation(conversationId); + console.log(`[Upload] Finish hook triggered for conversation ${conversationId}`); + } catch (error) { + console.error( + `[Upload] Failed to finish conversation ${conversationId}:`, + error, + ); + } + }), + ); return results; };