diff --git a/apps/desktop/src-tauri/src/api.rs b/apps/desktop/src-tauri/src/api.rs index b79cc5c7e3..c633d5ce91 100644 --- a/apps/desktop/src-tauri/src/api.rs +++ b/apps/desktop/src-tauri/src/api.rs @@ -80,7 +80,7 @@ pub async fn upload_multipart_presign_part( .map_err(|err| format!("api/upload_multipart_presign_part/request: {err}"))?; if !resp.status().is_success() { - let status = resp.status(); + let status = resp.status().as_u16(); let error_body = resp .text() .await @@ -155,7 +155,7 @@ pub async fn upload_multipart_complete( .map_err(|err| format!("api/upload_multipart_complete/request: {err}"))?; if !resp.status().is_success() { - let status = resp.status(); + let status = resp.status().as_u16(); let error_body = resp .text() .await @@ -211,7 +211,7 @@ pub async fn upload_signed( .map_err(|err| format!("api/upload_signed/request: {err}"))?; if !resp.status().is_success() { - let status = resp.status(); + let status = resp.status().as_u16(); let error_body = resp .text() .await @@ -245,7 +245,7 @@ pub async fn desktop_video_progress( .map_err(|err| format!("api/desktop_video_progress/request: {err}"))?; if !resp.status().is_success() { - let status = resp.status(); + let status = resp.status().as_u16(); let error_body = resp .text() .await diff --git a/apps/desktop/src-tauri/src/lib.rs b/apps/desktop/src-tauri/src/lib.rs index cbea530428..2a0eb78225 100644 --- a/apps/desktop/src-tauri/src/lib.rs +++ b/apps/desktop/src-tauri/src/lib.rs @@ -25,7 +25,6 @@ mod target_select_overlay; mod thumbnails; mod tray; mod upload; -mod upload_legacy; mod web_api; mod window_exclusion; mod windows; @@ -2598,8 +2597,8 @@ async fn resume_uploads(app: AppHandle) -> Result<(), String> { app.clone(), file_path, pre_created_video, - None, recording_dir, + None, ); } UploadMeta::SinglePartUpload { diff --git a/apps/desktop/src-tauri/src/posthog.rs b/apps/desktop/src-tauri/src/posthog.rs index 87dc404a0c..394d5e9c90 100644 --- a/apps/desktop/src-tauri/src/posthog.rs +++ b/apps/desktop/src-tauri/src/posthog.rs @@ -3,18 +3,40 @@ use tracing::error; #[derive(Debug)] pub enum PostHogEvent { - MultipartUploadComplete { duration: Duration }, - MultipartUploadFailed { duration: Duration, error: String }, + MultipartUploadComplete { + // Upload duration + duration: Duration, + // Length of the video + length: Duration, + // Size of the file in megabytes + size: u64, + }, + MultipartUploadFailed { + // Upload duration + duration: Duration, + // Error message + error: String, + }, } impl From for posthog_rs::Event { fn from(event: PostHogEvent) -> Self { - match event { - PostHogEvent::MultipartUploadComplete { duration } => { + let mut e = match event { + PostHogEvent::MultipartUploadComplete { + duration, + length, + size, + } => { let mut e = posthog_rs::Event::new_anon("multipart_upload_complete"); e.insert_prop("duration", duration.as_secs()) .map_err(|err| error!("Error adding PostHog property: {err:?}")) .ok(); + e.insert_prop("length", length.as_secs()) + .map_err(|err| error!("Error adding PostHog property: {err:?}")) + .ok(); + e.insert_prop("size", size) + .map_err(|err| error!("Error adding PostHog property: {err:?}")) + .ok(); e } PostHogEvent::MultipartUploadFailed { duration, error } => { @@ -27,7 +49,13 @@ impl From for posthog_rs::Event { .ok(); e } - } + }; + + e.insert_prop("cap_version", env!("CARGO_PKG_VERSION")) + .map_err(|err| error!("Error adding PostHog property: {err:?}")) + .ok(); + + e } } diff --git a/apps/desktop/src-tauri/src/recording.rs b/apps/desktop/src-tauri/src/recording.rs index e2e8ffa56f..9c8c11195b 100644 --- a/apps/desktop/src-tauri/src/recording.rs +++ b/apps/desktop/src-tauri/src/recording.rs @@ -533,14 +533,6 @@ pub async fn start_recording( return Err("Video upload info not found".to_string()); }; - let progressive_upload = InstantMultipartUpload::spawn( - app_handle, - recording_dir.join("content/output.mp4"), - video_upload_info.clone(), - Some(finish_upload_rx), - recording_dir.clone(), - ); - let mut builder = instant_recording::Actor::builder( recording_dir.clone(), inputs.capture_target.clone(), @@ -567,6 +559,14 @@ pub async fn start_recording( e.to_string() })?; + let progressive_upload = InstantMultipartUpload::spawn( + app_handle, + recording_dir.join("content/output.mp4"), + video_upload_info.clone(), + recording_dir.clone(), + Some(finish_upload_rx), + ); + InProgressRecording::Instant { handle, progressive_upload, @@ -734,25 +734,32 @@ pub async fn restart_recording( pub async fn delete_recording(app: AppHandle, state: MutableState<'_, App>) -> Result<(), String> { let recording_data = { let mut app_state = state.write().await; - if let Some(recording) = app_state.clear_current_recording() { - let recording_dir = recording.recording_dir().clone(); - let video_id = match &recording { - InProgressRecording::Instant { - video_upload_info, .. - } => Some(video_upload_info.id.clone()), - _ => None, - }; - Some((recording, recording_dir, video_id)) - } else { - None - } + app_state.clear_current_recording() }; - if let Some((_, recording_dir, video_id)) = recording_data { + if let Some(recording) = recording_data { + let recording_dir = recording.recording_dir().clone(); CurrentRecordingChanged.emit(&app).ok(); RecordingStopped {}.emit(&app).ok(); - // let _ = recording.cancel().await; + let video_id = match &recording { + InProgressRecording::Instant { + video_upload_info, + progressive_upload, + .. + } => { + debug!( + "User deleted recording. Aborting multipart upload for {:?}", + video_upload_info.id + ); + progressive_upload.handle.abort(); + + Some(video_upload_info.id.clone()) + } + _ => None, + }; + + let _ = recording.cancel().await; std::fs::remove_dir_all(&recording_dir).ok(); diff --git a/apps/desktop/src-tauri/src/upload.rs b/apps/desktop/src-tauri/src/upload.rs index 61a4ce4257..21e0268c53 100644 --- a/apps/desktop/src-tauri/src/upload.rs +++ b/apps/desktop/src-tauri/src/upload.rs @@ -33,10 +33,10 @@ use tokio::{ fs::File, io::{AsyncReadExt, AsyncSeekExt, BufReader}, task::{self, JoinHandle}, - time::{self, Instant}, + time::{self, Instant, timeout}, }; use tokio_util::io::ReaderStream; -use tracing::{Span, debug, error, info, instrument, trace}; +use tracing::{Span, debug, error, info, info_span, instrument, trace}; use tracing_futures::Instrument; pub struct UploadedItem { @@ -65,7 +65,6 @@ pub async fn upload_video( meta: S3VideoMeta, channel: Option>, ) -> Result { - println!("Uploading video {video_id}..."); info!("Uploading video {video_id}..."); let start = Instant::now(); @@ -103,9 +102,10 @@ pub async fn upload_video( .map_err(|e| error!("Failed to get video metadata: {e}")) .ok(); - api::upload_multipart_complete(&app, &video_id, &upload_id, &parts, metadata).await?; + api::upload_multipart_complete(&app, &video_id, &upload_id, &parts, metadata.clone()) + .await?; - Ok(()) + Ok(metadata) }; // TODO: We don't report progress on image upload @@ -126,8 +126,15 @@ pub async fn upload_video( tokio::join!(video_fut, thumbnail_fut); async_capture_event(match &video_result { - Ok(()) => PostHogEvent::MultipartUploadComplete { + Ok(meta) => PostHogEvent::MultipartUploadComplete { duration: start.elapsed(), + length: meta + .as_ref() + .map(|v| Duration::from_secs(v.duration_in_secs as u64)) + .unwrap_or_default(), + size: std::fs::metadata(file_path) + .map(|m| ((m.len() as f64) / 1_000_000.0) as u64) + .unwrap_or_default(), }, Err(err) => PostHogEvent::MultipartUploadFailed { duration: start.elapsed(), @@ -229,13 +236,21 @@ pub async fn create_or_get_video( error: String, } - if let Ok(error) = response.json::().await { + let status = response.status(); + let body = response.text().await; + + if let Some(error) = body + .as_ref() + .ok() + .and_then(|body| serde_json::from_str::(&*body).ok()) + && status == StatusCode::FORBIDDEN + { if error.error == "upgrade_required" { return Err(AuthedApiError::UpgradeRequired); } } - return Err("Unknown error uploading video".into()); + return Err(format!("create_or_get_video/error/{status}: {body:?}").into()); } let response_text = response @@ -318,23 +333,30 @@ impl InstantMultipartUpload { app: AppHandle, file_path: PathBuf, pre_created_video: VideoUploadInfo, - realtime_upload_done: Option>, recording_dir: PathBuf, + realtime_upload_done: Option>, ) -> Self { Self { handle: spawn_actor(async move { let start = Instant::now(); let result = Self::run( app, - file_path, + file_path.clone(), pre_created_video, - realtime_upload_done, recording_dir, + realtime_upload_done, ) .await; async_capture_event(match &result { - Ok(()) => PostHogEvent::MultipartUploadComplete { + Ok(meta) => PostHogEvent::MultipartUploadComplete { duration: start.elapsed(), + length: meta + .as_ref() + .map(|v| Duration::from_secs(v.duration_in_secs as u64)) + .unwrap_or_default(), + size: std::fs::metadata(file_path) + .map(|m| ((m.len() as f64) / 1_000_000.0) as u64) + .unwrap_or_default(), }, Err(err) => PostHogEvent::MultipartUploadFailed { duration: start.elapsed(), @@ -351,9 +373,9 @@ impl InstantMultipartUpload { app: AppHandle, file_path: PathBuf, pre_created_video: VideoUploadInfo, - realtime_video_done: Option>, recording_dir: PathBuf, - ) -> Result<(), AuthedApiError> { + realtime_video_done: Option>, + ) -> Result, AuthedApiError> { let video_id = pre_created_video.id.clone(); debug!("Initiating multipart upload for {video_id}..."); @@ -398,7 +420,8 @@ impl InstantMultipartUpload { .map_err(|e| error!("Failed to get video metadata: {e}")) .ok(); - api::upload_multipart_complete(&app, &video_id, &upload_id, &parts, metadata).await?; + api::upload_multipart_complete(&app, &video_id, &upload_id, &parts, metadata.clone()) + .await?; info!("Multipart upload complete for {video_id}."); let mut project_meta = RecordingMeta::load_for_project(&recording_dir).map_err(|err| { @@ -411,7 +434,7 @@ impl InstantMultipartUpload { let _ = app.clipboard().write_text(pre_created_video.link.clone()); - Ok(()) + Ok(metadata) } } @@ -459,7 +482,17 @@ pub fn from_pending_file_to_chunks( realtime_upload_done: Option>, ) -> impl Stream> { try_stream! { - let mut file = tokio::fs::File::open(&path).await?; + let mut file = timeout(Duration::from_secs(20), async move { + loop { + if let Ok(file) = tokio::fs::File::open(&path).await.map_err(|err| error!("from_pending_file_to_chunks/open: {err:?}")) { + break file; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + }) + .await + .map_err(|_| io::Error::new(io::ErrorKind::Other, "Failed to open file. The recording pipeline may have crashed?"))?; + let mut part_number = 1; let mut last_read_position: u64 = 0; let mut realtime_is_done = realtime_upload_done.as_ref().map(|_| false); @@ -473,10 +506,10 @@ pub fn from_pending_file_to_chunks( match realtime_receiver.try_recv() { Ok(_) => realtime_is_done = Some(true), Err(flume::TryRecvError::Empty) => {}, - Err(_) => yield Err(std::io::Error::new( - std::io::ErrorKind::Interrupted, - "Realtime generation failed" - ))?, + // This means all senders where dropped. + // This can assume this means realtime is done. + // It possibly means something has gone wrong but that's not the uploader's problem. + Err(_) => realtime_is_done = Some(true), } } } @@ -646,7 +679,7 @@ fn multipart_uploader( .map_err(|err| format!("uploader/part/{part_number}/client: {err:?}"))? .put(&presigned_url) .header("Content-Length", chunk.len()) - .timeout(Duration::from_secs(120)).body(chunk); + .timeout(Duration::from_secs(5 * 60)).body(chunk); if let Some(md5_sum) = &md5_sum { req = req.header("Content-MD5", md5_sum); @@ -654,6 +687,7 @@ fn multipart_uploader( let resp = req .send() + .instrument(info_span!("send", size = size)) .await .map_err(|err| format!("uploader/part/{part_number}/error: {err:?}"))?; @@ -698,7 +732,6 @@ pub async fn singlepart_uploader( .map_err(|err| format!("singlepart_uploader/client: {err:?}"))? .put(&presigned_url) .header("Content-Length", total_size) - .timeout(Duration::from_secs(120)) .body(reqwest::Body::wrap_stream(stream)) .send() .await diff --git a/apps/desktop/src-tauri/src/upload_legacy.rs b/apps/desktop/src-tauri/src/upload_legacy.rs deleted file mode 100644 index 140f73a537..0000000000 --- a/apps/desktop/src-tauri/src/upload_legacy.rs +++ /dev/null @@ -1,1142 +0,0 @@ -//! This is the legacy uploading module. -//! We are keeping it for now as an easy fallback. -//! -//! You should avoid making changes to it, make changes to the new upload module instead. - -// credit @filleduchaos - -use crate::api::S3VideoMeta; -use crate::web_api::{AuthedApiError, ManagerExt}; -use crate::{UploadProgress, VideoUploadInfo}; -use ffmpeg::ffi::AV_TIME_BASE; -use flume::Receiver; -use futures::StreamExt; -use image::ImageReader; -use image::codecs::jpeg::JpegEncoder; -use reqwest::StatusCode; -use reqwest::header::CONTENT_LENGTH; -use serde::{Deserialize, Serialize}; -use serde_json::json; -use specta::Type; -use std::{ - path::PathBuf, - time::{Duration, Instant}, -}; -use tauri::{AppHandle, ipc::Channel}; -use tauri_plugin_clipboard_manager::ClipboardExt; -use tokio::io::{AsyncReadExt, AsyncSeekExt}; -use tokio::task::{self, JoinHandle}; -use tokio::time::sleep; -use tracing::{debug, error, info, trace, warn}; - -#[derive(Deserialize, Serialize, Clone, Type, Debug)] -pub struct S3UploadMeta { - id: String, -} - -#[derive(Deserialize, Clone, Debug)] -pub struct CreateErrorResponse { - error: String, -} - -// fn deserialize_empty_object_as_string<'de, D>(deserializer: D) -> Result -// where -// D: Deserializer<'de>, -// { -// struct StringOrObject; - -// impl<'de> de::Visitor<'de> for StringOrObject { -// type Value = String; - -// fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { -// formatter.write_str("string or empty object") -// } - -// fn visit_str(self, value: &str) -> Result -// where -// E: de::Error, -// { -// Ok(value.to_string()) -// } - -// fn visit_string(self, value: String) -> Result -// where -// E: de::Error, -// { -// Ok(value) -// } - -// fn visit_map(self, _map: M) -> Result -// where -// M: de::MapAccess<'de>, -// { -// // Return empty string for empty objects -// Ok(String::new()) -// } -// } - -// deserializer.deserialize_any(StringOrObject) -// } - -// impl S3UploadMeta { -// pub fn id(&self) -> &str { -// &self.id -// } - -// // pub fn new(id: String) -> Self { -// // Self { id } -// // } -// } - -pub struct UploadedVideo { - pub link: String, - pub id: String, - #[allow(unused)] - pub config: S3UploadMeta, -} - -pub struct UploadedImage { - pub link: String, - pub id: String, -} - -// pub struct UploadedAudio { -// pub link: String, -// pub id: String, -// pub config: S3UploadMeta, -// } - -pub struct UploadProgressUpdater { - video_state: Option, - app: AppHandle, - video_id: String, -} - -struct VideoProgressState { - uploaded: u64, - total: u64, - pending_task: Option>, - last_update_time: Instant, -} - -impl UploadProgressUpdater { - pub fn new(app: AppHandle, video_id: String) -> Self { - Self { - video_state: None, - app, - video_id, - } - } - - pub fn update(&mut self, uploaded: u64, total: u64) { - let should_send_immediately = { - let state = self.video_state.get_or_insert_with(|| VideoProgressState { - uploaded, - total, - pending_task: None, - last_update_time: Instant::now(), - }); - - // Cancel any pending task - if let Some(handle) = state.pending_task.take() { - handle.abort(); - } - - state.uploaded = uploaded; - state.total = total; - state.last_update_time = Instant::now(); - - // Send immediately if upload is complete - uploaded >= total - }; - - let app = self.app.clone(); - if should_send_immediately { - tokio::spawn({ - let video_id = self.video_id.clone(); - async move { - Self::send_api_update(&app, video_id, uploaded, total).await; - } - }); - - // Clear state since upload is complete - self.video_state = None; - } else { - // Schedule delayed update - let handle = { - let video_id = self.video_id.clone(); - tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(2)).await; - Self::send_api_update(&app, video_id, uploaded, total).await; - }) - }; - - if let Some(state) = &mut self.video_state { - state.pending_task = Some(handle); - } - } - } - - async fn send_api_update(app: &AppHandle, video_id: String, uploaded: u64, total: u64) { - let response = app - .authed_api_request("/api/desktop/video/progress", |client, url| { - client - .post(url) - .header("X-Cap-Desktop-Version", env!("CARGO_PKG_VERSION")) - .json(&json!({ - "videoId": video_id, - "uploaded": uploaded, - "total": total, - "updatedAt": chrono::Utc::now().to_rfc3339() - })) - }) - .await; - - match response { - Ok(resp) if resp.status().is_success() => { - trace!("Progress update sent successfully"); - } - Ok(resp) => error!("Failed to send progress update: {}", resp.status()), - Err(err) => error!("Failed to send progress update: {err}"), - } - } -} - -pub async fn upload_video( - app: &AppHandle, - video_id: String, - file_path: PathBuf, - existing_config: Option, - screenshot_path: Option, - meta: Option, - channel: Option>, -) -> Result { - println!("Uploading video {video_id}..."); - - let client = reqwest::Client::new(); - let s3_config = match existing_config { - Some(config) => config, - None => create_or_get_video(app, false, Some(video_id.clone()), None, meta).await?, - }; - - let presigned_put = presigned_s3_put( - app, - PresignedS3PutRequest { - video_id: video_id.clone(), - subpath: "result.mp4".to_string(), - method: PresignedS3PutRequestMethod::Put, - meta: Some(build_video_meta(&file_path)?), - }, - ) - .await?; - - let file = tokio::fs::File::open(&file_path) - .await - .map_err(|e| format!("Failed to open file: {e}"))?; - - let metadata = file - .metadata() - .await - .map_err(|e| format!("Failed to get file metadata: {e}"))?; - - let total_size = metadata.len(); - - let reader_stream = tokio_util::io::ReaderStream::new(file); - - let mut bytes_uploaded = 0u64; - let mut progress = UploadProgressUpdater::new(app.clone(), video_id); - - let progress_stream = reader_stream.inspect(move |chunk| { - if let Ok(chunk) = chunk { - bytes_uploaded += chunk.len() as u64; - } - - if bytes_uploaded > 0 { - if let Some(channel) = &channel { - channel - .send(UploadProgress { - progress: bytes_uploaded as f64 / total_size as f64, - }) - .ok(); - } - - progress.update(bytes_uploaded, total_size); - } - }); - - let screenshot_upload = match screenshot_path { - Some(screenshot_path) if screenshot_path.exists() => { - Some(prepare_screenshot_upload(app, &s3_config, screenshot_path)) - } - _ => None, - }; - - let video_upload = client - .put(presigned_put) - .body(reqwest::Body::wrap_stream(progress_stream)) - .header(CONTENT_LENGTH, metadata.len()); - - let (video_upload, screenshot_result): ( - Result, - Option>, - ) = tokio::join!(video_upload.send(), async { - if let Some(screenshot_req) = screenshot_upload { - Some(screenshot_req.await) - } else { - None - } - }); - - let response = video_upload.map_err(|e| format!("Failed to send upload file request: {e}"))?; - - if response.status().is_success() { - println!("Video uploaded successfully"); - - if let Some(Ok(screenshot_response)) = screenshot_result { - if screenshot_response.status().is_success() { - println!("Screenshot uploaded successfully"); - } else { - println!( - "Failed to upload screenshot: {}", - screenshot_response.status() - ); - } - } - - return Ok(UploadedVideo { - link: app.make_app_url(format!("/s/{}", &s3_config.id)).await, - id: s3_config.id.clone(), - config: s3_config, - }); - } - - let status = response.status(); - let error_body = response - .text() - .await - .unwrap_or_else(|_| "".to_string()); - tracing::error!( - "Failed to upload file. Status: {}. Body: {}", - status, - error_body - ); - Err(format!("Failed to upload file. Status: {status}. Body: {error_body}").into()) -} - -pub async fn upload_image(app: &AppHandle, file_path: PathBuf) -> Result { - let file_name = file_path - .file_name() - .and_then(|name| name.to_str()) - .ok_or("Invalid file path")? - .to_string(); - - let client = reqwest::Client::new(); - let s3_config = create_or_get_video(app, true, None, None, None).await?; - - let presigned_put = presigned_s3_put( - app, - PresignedS3PutRequest { - video_id: s3_config.id.clone(), - subpath: file_name, - method: PresignedS3PutRequestMethod::Put, - meta: None, - }, - ) - .await?; - - let file_content = tokio::fs::read(&file_path) - .await - .map_err(|e| format!("Failed to read file: {e}"))?; - - let response = client - .put(presigned_put) - .header(CONTENT_LENGTH, file_content.len()) - .body(file_content) - .send() - .await - .map_err(|e| format!("Failed to send upload file request: {e}"))?; - - if response.status().is_success() { - println!("File uploaded successfully"); - return Ok(UploadedImage { - link: app.make_app_url(format!("/s/{}", &s3_config.id)).await, - id: s3_config.id, - }); - } - - let status = response.status(); - let error_body = response - .text() - .await - .unwrap_or_else(|_| "".to_string()); - tracing::error!( - "Failed to upload file. Status: {}. Body: {}", - status, - error_body - ); - Err(format!( - "Failed to upload file. Status: {status}. Body: {error_body}" - )) -} - -pub async fn create_or_get_video( - app: &AppHandle, - is_screenshot: bool, - video_id: Option, - name: Option, - meta: Option, -) -> Result { - let mut s3_config_url = if let Some(id) = video_id { - format!("/api/desktop/video/create?recordingMode=desktopMP4&videoId={id}") - } else if is_screenshot { - "/api/desktop/video/create?recordingMode=desktopMP4&isScreenshot=true".to_string() - } else { - "/api/desktop/video/create?recordingMode=desktopMP4".to_string() - }; - - if let Some(name) = name { - s3_config_url.push_str(&format!("&name={name}")); - } - - if let Some(meta) = meta { - s3_config_url.push_str(&format!("&durationInSecs={}", meta.duration_in_secs)); - s3_config_url.push_str(&format!("&width={}", meta.width)); - s3_config_url.push_str(&format!("&height={}", meta.height)); - if let Some(fps) = meta.fps { - s3_config_url.push_str(&format!("&fps={}", fps)); - } - } - - let response = app - .authed_api_request(s3_config_url, |client, url| client.get(url)) - .await - .map_err(|e| format!("Failed to send request to Next.js handler: {e}"))?; - - if response.status() == StatusCode::UNAUTHORIZED { - return Err("Failed to authenticate request; please log in again".into()); - } - - if response.status() != StatusCode::OK { - if let Ok(error) = response.json::().await { - if error.error == "upgrade_required" { - return Err( - "You must upgrade to Cap Pro to upload recordings over 5 minutes in length" - .into(), - ); - } - - return Err(format!("server error: {}", error.error)); - } - - return Err("Unknown error uploading video".into()); - } - - let response_text = response - .text() - .await - .map_err(|e| format!("Failed to read response body: {e}"))?; - - let config = serde_json::from_str::(&response_text).map_err(|e| { - format!("Failed to deserialize response: {e}. Response body: {response_text}") - })?; - - Ok(config) -} - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -pub struct PresignedS3PutRequest { - video_id: String, - subpath: String, - method: PresignedS3PutRequestMethod, - #[serde(flatten)] - meta: Option, -} - -#[derive(Serialize)] -#[serde(rename_all = "lowercase")] -pub enum PresignedS3PutRequestMethod { - #[allow(unused)] - Post, - Put, -} - -async fn presigned_s3_put(app: &AppHandle, body: PresignedS3PutRequest) -> Result { - #[derive(Deserialize, Debug)] - struct Data { - url: String, - } - - #[derive(Deserialize, Debug)] - #[serde(rename_all = "camelCase")] - struct Wrapper { - presigned_put_data: Data, - } - - let response = app - .authed_api_request("/api/upload/signed", |client, url| { - client.post(url).json(&body) - }) - .await - .map_err(|e| format!("Failed to send request to Next.js handler: {e}"))?; - - if response.status() == StatusCode::UNAUTHORIZED { - return Err("Failed to authenticate request; please log in again".into()); - } - - let Wrapper { presigned_put_data } = response - .json::() - .await - .map_err(|e| format!("Failed to deserialize server response: {e}"))?; - - Ok(presigned_put_data.url) -} - -pub fn build_video_meta(path: &PathBuf) -> Result { - let input = - ffmpeg::format::input(path).map_err(|e| format!("Failed to read input file: {e}"))?; - let video_stream = input - .streams() - .best(ffmpeg::media::Type::Video) - .ok_or_else(|| "Failed to find appropriate video stream in file".to_string())?; - - let video_codec = ffmpeg::codec::context::Context::from_parameters(video_stream.parameters()) - .map_err(|e| format!("Unable to read video codec information: {e}"))?; - let video = video_codec - .decoder() - .video() - .map_err(|e| format!("Unable to get video decoder: {e}"))?; - - Ok(S3VideoMeta { - duration_in_secs: input.duration() as f64 / AV_TIME_BASE as f64, - width: video.width(), - height: video.height(), - fps: video - .frame_rate() - .map(|v| (v.numerator() as f32 / v.denominator() as f32)), - }) -} - -// fn build_audio_upload_body( -// path: &PathBuf, -// base: S3UploadBody, -// ) -> Result { -// let input = -// ffmpeg::format::input(path).map_err(|e| format!("Failed to read input file: {e}"))?; -// let stream = input -// .streams() -// .best(ffmpeg::media::Type::Audio) -// .ok_or_else(|| "Failed to find appropriate audio stream in file".to_string())?; - -// let duration_millis = input.duration() as f64 / 1000.; - -// let codec = ffmpeg::codec::context::Context::from_parameters(stream.parameters()) -// .map_err(|e| format!("Unable to read audio codec information: {e}"))?; -// let codec_name = codec.id(); - -// let is_mp3 = path.extension().is_some_and(|ext| ext == "mp3"); - -// Ok(S3AudioUploadBody { -// base, -// duration: duration_millis.to_string(), -// audio_codec: format!("{codec_name:?}").replace("Id::", "").to_lowercase(), -// is_mp3, -// }) -// } - -pub async fn prepare_screenshot_upload( - app: &AppHandle, - s3_config: &S3UploadMeta, - screenshot_path: PathBuf, -) -> Result { - let presigned_put = presigned_s3_put( - app, - PresignedS3PutRequest { - video_id: s3_config.id.clone(), - subpath: "screenshot/screen-capture.jpg".to_string(), - method: PresignedS3PutRequestMethod::Put, - meta: None, - }, - ) - .await?; - - let compressed_image = compress_image(screenshot_path).await?; - - reqwest::Client::new() - .put(presigned_put) - .header(CONTENT_LENGTH, compressed_image.len()) - .body(compressed_image) - .send() - .await - .map_err(|e| format!("Error uploading screenshot: {e}")) -} - -async fn compress_image(path: PathBuf) -> Result, String> { - task::spawn_blocking(move || { - let img = ImageReader::open(&path) - .map_err(|e| format!("Failed to open image: {e}"))? - .decode() - .map_err(|e| format!("Failed to decode image: {e}"))?; - - let new_width = img.width() / 2; - let new_height = img.height() / 2; - - let resized_img = img.resize(new_width, new_height, image::imageops::FilterType::Nearest); - - let mut buffer = Vec::new(); - let mut encoder = JpegEncoder::new_with_quality(&mut buffer, 30); - encoder - .encode( - resized_img.as_bytes(), - new_width, - new_height, - resized_img.color().into(), - ) - .map_err(|e| format!("Failed to compress image: {e}"))?; - - Ok(buffer) - }) - .await - .map_err(|e| format!("Failed to compress image: {e}"))? -} - -// a typical recommended chunk size is 5MB (AWS min part size). -const CHUNK_SIZE: u64 = 5 * 1024 * 1024; // 5MB -// const MIN_PART_SIZE: u64 = 5 * 1024 * 1024; // For non-final parts - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -pub struct MultipartCompleteResponse<'a> { - video_id: &'a str, - upload_id: &'a str, - parts: &'a [UploadedPart], - #[serde(flatten)] - meta: Option, -} - -pub struct InstantMultipartUpload {} - -impl InstantMultipartUpload { - pub async fn run( - app: AppHandle, - video_id: String, - file_path: PathBuf, - pre_created_video: VideoUploadInfo, - realtime_video_done: Option>, - ) -> Result<(), String> { - use std::time::Duration; - - use tokio::time::sleep; - - // -------------------------------------------- - // basic constants and info for chunk approach - // -------------------------------------------- - let client = reqwest::Client::new(); - let s3_config = pre_created_video.config; - - let mut uploaded_parts = Vec::new(); - let mut part_number = 1; - let mut last_uploaded_position: u64 = 0; - let mut progress = UploadProgressUpdater::new(app.clone(), pre_created_video.id.clone()); - - // -------------------------------------------- - // initiate the multipart upload - // -------------------------------------------- - debug!("Initiating multipart upload for {video_id}..."); - let initiate_response = match app - .authed_api_request("/api/upload/multipart/initiate", |c, url| { - c.post(url) - .header("Content-Type", "application/json") - .json(&serde_json::json!({ - "videoId": s3_config.id, - "contentType": "video/mp4" - })) - }) - .await - { - Ok(r) => r, - Err(e) => { - return Err(format!("Failed to initiate multipart upload: {e}")); - } - }; - - if !initiate_response.status().is_success() { - let status = initiate_response.status(); - let error_body = initiate_response - .text() - .await - .unwrap_or_else(|_| "".to_string()); - return Err(format!( - "Failed to initiate multipart upload. Status: {status}. Body: {error_body}" - )); - } - - let initiate_data = match initiate_response.json::().await { - Ok(d) => d, - Err(e) => { - return Err(format!("Failed to parse initiate response: {e}")); - } - }; - - let upload_id = match initiate_data.get("uploadId") { - Some(val) => val.as_str().unwrap_or("").to_string(), - None => { - return Err("No uploadId returned from initiate endpoint".to_string()); - } - }; - - if upload_id.is_empty() { - return Err("Empty uploadId returned from initiate endpoint".to_string()); - } - - println!("Multipart upload initiated with ID: {upload_id}"); - - let mut realtime_is_done = realtime_video_done.as_ref().map(|_| false); - - // -------------------------------------------- - // Main loop while upload not complete: - // - If we have >= CHUNK_SIZE new data, upload. - // - If recording hasn't stopped, keep waiting. - // - If recording stopped, do leftover final(s). - // -------------------------------------------- - loop { - if !realtime_is_done.unwrap_or(true) - && let Some(realtime_video_done) = &realtime_video_done - { - match realtime_video_done.try_recv() { - Ok(_) => { - realtime_is_done = Some(true); - } - Err(flume::TryRecvError::Empty) => {} - _ => { - warn!("cancelling upload as realtime generation failed"); - return Err("cancelling upload as realtime generation failed".to_string()); - } - } - } - - // Check the file's current size - if !file_path.exists() { - println!("File no longer exists, aborting upload"); - return Err("File no longer exists".to_string()); - } - - let file_size = match tokio::fs::metadata(&file_path).await { - Ok(md) => md.len(), - Err(e) => { - println!("Failed to get file metadata: {e}"); - sleep(Duration::from_millis(500)).await; - continue; - } - }; - - let new_data_size = file_size - last_uploaded_position; - - if ((new_data_size >= CHUNK_SIZE) - || new_data_size > 0 && realtime_is_done.unwrap_or(false)) - || (realtime_is_done.is_none() && new_data_size > 0) - { - // We have a full chunk to send - match Self::upload_chunk( - &app, - &client, - &file_path, - &s3_config.id, - &upload_id, - &mut part_number, - &mut last_uploaded_position, - new_data_size.min(CHUNK_SIZE), - &mut progress, - ) - .await - { - Ok(part) => { - uploaded_parts.push(part); - } - Err(e) => { - println!( - "Error uploading chunk (part {part_number}): {e}. Retrying in 1s..." - ); - sleep(Duration::from_secs(1)).await; - } - } - } else if new_data_size == 0 && realtime_is_done.unwrap_or(true) { - if realtime_is_done.unwrap_or(false) { - info!("realtime video done, uploading header chunk"); - - let part = Self::upload_chunk( - &app, - &client, - &file_path, - &s3_config.id, - &upload_id, - &mut 1, - &mut 0, - uploaded_parts[0].size as u64, - &mut progress, - ) - .await - .map_err(|err| format!("Failed to re-upload first chunk: {err}"))?; - - uploaded_parts[0] = part; - println!("Successfully re-uploaded first chunk",); - } - - // All leftover chunks are now uploaded. We finalize. - println!( - "Completing multipart upload with {} parts", - uploaded_parts.len() - ); - Self::finalize_upload(&app, &file_path, &s3_config.id, &upload_id, &uploaded_parts) - .await?; - - break; - } else { - tokio::time::sleep(Duration::from_secs(1)).await; - } - } - - // Copy link to clipboard early - let _ = app.clipboard().write_text(pre_created_video.link.clone()); - - Ok(()) - } - - /// Upload a single chunk from the file at `last_uploaded_position` for `chunk_size` bytes. - /// Advances `last_uploaded_position` accordingly. Returns JSON { PartNumber, ETag, Size }. - #[allow(clippy::too_many_arguments)] - async fn upload_chunk( - app: &AppHandle, - client: &reqwest::Client, - file_path: &PathBuf, - video_id: &str, - upload_id: &str, - part_number: &mut i32, - last_uploaded_position: &mut u64, - chunk_size: u64, - progress: &mut UploadProgressUpdater, - ) -> Result { - let file_size = match tokio::fs::metadata(file_path).await { - Ok(metadata) => metadata.len(), - Err(e) => return Err(format!("Failed to get file metadata: {e}")), - }; - - // Check if we're at the end of the file - if *last_uploaded_position >= file_size { - return Err("No more data to read - already at end of file".to_string()); - } - - // Calculate how much we can actually read - let remaining = file_size - *last_uploaded_position; - let bytes_to_read = std::cmp::min(chunk_size, remaining); - - let mut file = tokio::fs::File::open(file_path) - .await - .map_err(|e| format!("Failed to open file: {e}"))?; - - // Log before seeking - println!( - "Seeking to offset {} for part {} (file size: {}, remaining: {})", - *last_uploaded_position, *part_number, file_size, remaining - ); - - // Seek to the position we left off - if let Err(e) = file - .seek(std::io::SeekFrom::Start(*last_uploaded_position)) - .await - { - return Err(format!("Failed to seek in file: {e}")); - } - - // Read exactly bytes_to_read - let mut chunk = vec![0u8; bytes_to_read as usize]; - let mut total_read = 0; - - while total_read < bytes_to_read as usize { - match file.read(&mut chunk[total_read..]).await { - Ok(0) => break, // EOF - Ok(n) => { - total_read += n; - println!("Read {n} bytes, total so far: {total_read}/{bytes_to_read}"); - } - Err(e) => return Err(format!("Failed to read chunk from file: {e}")), - } - } - - if total_read == 0 { - return Err("No data to upload for this part.".to_string()); - } - - // Truncate the buffer to the actual bytes read - chunk.truncate(total_read); - - // Basic content‑MD5 for data integrity - let md5_sum = { - let digest = md5::compute(&chunk); - base64::encode(digest.0) - }; - - // Verify file position to ensure we're not experiencing file handle issues - let pos_after_read = file - .seek(std::io::SeekFrom::Current(0)) - .await - .map_err(|e| format!("Failed to get current file position: {e}"))?; - - let expected_pos = *last_uploaded_position + total_read as u64; - if pos_after_read != expected_pos { - println!( - "WARNING: File position after read ({pos_after_read}) doesn't match expected position ({expected_pos})" - ); - } - - let file_size = tokio::fs::metadata(file_path) - .await - .map(|m| m.len()) - .unwrap_or(0); - let remaining = file_size - *last_uploaded_position; - - println!( - "File size: {}, Last uploaded: {}, Remaining: {}, chunk_size: {}, part: {}", - file_size, *last_uploaded_position, remaining, chunk_size, *part_number - ); - println!( - "Uploading part {} ({} bytes), MD5: {}", - *part_number, total_read, md5_sum - ); - - // Request presigned URL for this part - let presign_response = match app - .authed_api_request("/api/upload/multipart/presign-part", |c, url| { - c.post(url) - .header("Content-Type", "application/json") - .json(&serde_json::json!({ - "videoId": video_id, - "uploadId": upload_id, - "partNumber": *part_number, - })) - }) - .await - { - Ok(r) => r, - Err(e) => { - return Err(format!( - "Failed to request presigned URL for part {}: {}", - *part_number, e - )); - } - }; - - progress.update(expected_pos, file_size); - - if !presign_response.status().is_success() { - let status = presign_response.status(); - let error_body = presign_response - .text() - .await - .unwrap_or_else(|_| "".to_string()); - return Err(format!( - "Presign-part failed for part {}: status={}, body={}", - *part_number, status, error_body - )); - } - - let presign_data = match presign_response.json::().await { - Ok(d) => d, - Err(e) => return Err(format!("Failed to parse presigned URL response: {e}")), - }; - - let presigned_url = presign_data - .get("presignedUrl") - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); - - if presigned_url.is_empty() { - return Err(format!("Empty presignedUrl for part {}", *part_number)); - } - - // Upload the chunk with retry - let mut retry_count = 0; - let max_retries = 3; - let mut etag: Option = None; - - while retry_count < max_retries && etag.is_none() { - println!( - "Sending part {} (attempt {}/{}): {} bytes", - *part_number, - retry_count + 1, - max_retries, - total_read - ); - - match client - .put(&presigned_url) - .timeout(Duration::from_secs(120)) - .body(chunk.clone()) - .send() - .await - { - Ok(upload_response) => { - if upload_response.status().is_success() { - if let Some(etag_val) = upload_response.headers().get("ETag") { - let e = etag_val - .to_str() - .unwrap_or("") - .trim_matches('"') - .to_string(); - println!("Received ETag {} for part {}", e, *part_number); - etag = Some(e); - } else { - println!("No ETag in response for part {}", *part_number); - retry_count += 1; - sleep(Duration::from_secs(2)).await; - } - } else { - println!( - "Failed part {} (status {}). Will retry if possible.", - *part_number, - upload_response.status() - ); - if let Ok(body) = upload_response.text().await { - println!("Error response: {body}"); - } - retry_count += 1; - sleep(Duration::from_secs(2)).await; - } - } - Err(e) => { - println!( - "Part {} upload error (attempt {}/{}): {}", - *part_number, - retry_count + 1, - max_retries, - e - ); - retry_count += 1; - sleep(Duration::from_secs(2)).await; - } - } - } - - let etag = match etag { - Some(e) => e, - None => { - return Err(format!( - "Failed to upload part {} after {} attempts", - *part_number, max_retries - )); - } - }; - - // Advance the global progress - *last_uploaded_position += total_read as u64; - println!( - "After upload: new last_uploaded_position is {} ({}% of file)", - *last_uploaded_position, - (*last_uploaded_position as f64 / file_size as f64 * 100.0) as u32 - ); - - let part = UploadedPart { - part_number: *part_number, - etag, - size: total_read, - }; - *part_number += 1; - Ok(part) - } - - /// Completes the multipart upload with the stored parts. - /// Logs a final location if the complete call is successful. - async fn finalize_upload( - app: &AppHandle, - file_path: &PathBuf, - video_id: &str, - upload_id: &str, - uploaded_parts: &[UploadedPart], - ) -> Result<(), String> { - println!( - "Completing multipart upload with {} parts", - uploaded_parts.len() - ); - - if uploaded_parts.is_empty() { - return Err("No parts uploaded before finalizing.".to_string()); - } - - let mut total_bytes_in_parts = 0; - for part in uploaded_parts { - let pn = part.part_number; - let size = part.size; - let etag = &part.etag; - total_bytes_in_parts += part.size; - println!("Part {pn}: {size} bytes (ETag: {etag})"); - } - - let file_final_size = tokio::fs::metadata(file_path) - .await - .map(|md| md.len()) - .unwrap_or(0); - - println!("Sum of all parts: {total_bytes_in_parts} bytes"); - println!("File size on disk: {file_final_size} bytes"); - println!("Proceeding with multipart upload completion..."); - - let metadata = build_video_meta(file_path) - .map_err(|e| error!("Failed to get video metadata: {e}")) - .ok(); - - let complete_response = match app - .authed_api_request("/api/upload/multipart/complete", |c, url| { - c.post(url).header("Content-Type", "application/json").json( - &MultipartCompleteResponse { - video_id, - upload_id, - parts: uploaded_parts, - meta: metadata, - }, - ) - }) - .await - { - Ok(response) => response, - Err(e) => { - return Err(format!("Failed to complete multipart upload: {e}")); - } - }; - - if !complete_response.status().is_success() { - let status = complete_response.status(); - let error_body = complete_response - .text() - .await - .unwrap_or_else(|_| "".to_string()); - return Err(format!( - "Failed to complete multipart upload. Status: {status}. Body: {error_body}" - )); - } - - let complete_data = match complete_response.json::().await { - Ok(d) => d, - Err(e) => { - return Err(format!("Failed to parse completion response: {e}")); - } - }; - - if let Some(location) = complete_data.get("location") { - println!("Multipart upload complete. Final S3 location: {location}"); - } else { - println!("Multipart upload complete. No 'location' in response."); - } - - println!("Multipart upload complete for {video_id}."); - Ok(()) - } -} - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -struct UploadedPart { - part_number: i32, - etag: String, - size: usize, -} diff --git a/apps/desktop/src-tauri/src/web_api.rs b/apps/desktop/src-tauri/src/web_api.rs index b87c610668..bdc6b405b4 100644 --- a/apps/desktop/src-tauri/src/web_api.rs +++ b/apps/desktop/src-tauri/src/web_api.rs @@ -1,8 +1,5 @@ use reqwest::StatusCode; -use serde::Serialize; -use specta::Type; use tauri::{Emitter, Manager, Runtime}; -use tauri_specta::Event; use thiserror::Error; use tracing::{error, warn}; @@ -20,13 +17,24 @@ pub enum AuthedApiError { #[error("AuthedApiError/AuthStore: {0}")] AuthStore(String), #[error("AuthedApiError/Request: {0}")] - Request(#[from] reqwest::Error), + Request(reqwest::Error), #[error("AuthedApiError/Deserialization: {0}")] Deserialization(#[from] serde_json::Error), + #[error("The request has timed out")] + Timeout, #[error("AuthedApiError/Other: {0}")] Other(String), } +impl From for AuthedApiError { + fn from(err: reqwest::Error) -> Self { + match err { + err if err.is_timeout() => AuthedApiError::Timeout, + err => AuthedApiError::Request(err), + } + } +} + impl From<&'static str> for AuthedApiError { fn from(value: &'static str) -> Self { AuthedApiError::Other(value.into()) @@ -129,8 +137,8 @@ impl + Emitter, R: Runtime> ManagerExt for T { } async fn is_server_url_custom(&self) -> bool { - let mut state = self.state::>(); - let mut app_state = state.read().await; + let state = self.state::>(); + let app_state = state.read().await; if let Some(env_url) = std::option_env!("VITE_SERVER_URL") { return app_state.server_url != env_url; diff --git a/apps/web/app/api/upload/[...route]/multipart.ts b/apps/web/app/api/upload/[...route]/multipart.ts index 7664f5027f..eb946b70a1 100644 --- a/apps/web/app/api/upload/[...route]/multipart.ts +++ b/apps/web/app/api/upload/[...route]/multipart.ts @@ -250,7 +250,7 @@ app.post( const maybeVideo = yield* videos.getById(Video.VideoId.make(videoId)); if (Option.isNone(maybeVideo)) { c.status(404); - return c.text("Video not found"); + return c.text(`Video '${encodeURIComponent(videoId)}' not found`); } const [video] = maybeVideo.value;