Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions apps/desktop/src-tauri/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub async fn upload_multipart_presign_part(
.map_err(|err| format!("api/upload_multipart_presign_part/request: {err}"))?;

if !resp.status().is_success() {
let status = resp.status();
let status = resp.status().as_u16();
let error_body = resp
.text()
.await
Expand Down Expand Up @@ -155,7 +155,7 @@ pub async fn upload_multipart_complete(
.map_err(|err| format!("api/upload_multipart_complete/request: {err}"))?;

if !resp.status().is_success() {
let status = resp.status();
let status = resp.status().as_u16();
let error_body = resp
.text()
.await
Expand Down Expand Up @@ -211,7 +211,7 @@ pub async fn upload_signed(
.map_err(|err| format!("api/upload_signed/request: {err}"))?;

if !resp.status().is_success() {
let status = resp.status();
let status = resp.status().as_u16();
let error_body = resp
.text()
.await
Expand Down Expand Up @@ -245,7 +245,7 @@ pub async fn desktop_video_progress(
.map_err(|err| format!("api/desktop_video_progress/request: {err}"))?;

if !resp.status().is_success() {
let status = resp.status();
let status = resp.status().as_u16();
let error_body = resp
.text()
.await
Expand Down
3 changes: 1 addition & 2 deletions apps/desktop/src-tauri/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ mod target_select_overlay;
mod thumbnails;
mod tray;
mod upload;
mod upload_legacy;
mod web_api;
mod window_exclusion;
mod windows;
Expand Down Expand Up @@ -2598,8 +2597,8 @@ async fn resume_uploads(app: AppHandle) -> Result<(), String> {
app.clone(),
file_path,
pre_created_video,
None,
recording_dir,
None,
);
}
UploadMeta::SinglePartUpload {
Expand Down
38 changes: 33 additions & 5 deletions apps/desktop/src-tauri/src/posthog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,40 @@ use tracing::error;

#[derive(Debug)]
pub enum PostHogEvent {
MultipartUploadComplete { duration: Duration },
MultipartUploadFailed { duration: Duration, error: String },
MultipartUploadComplete {
// Upload duration
duration: Duration,
// Length of the video
length: Duration,
// Size of the file in megabytes
size: u64,
},
MultipartUploadFailed {
// Upload duration
duration: Duration,
// Error message
error: String,
},
}

impl From<PostHogEvent> for posthog_rs::Event {
fn from(event: PostHogEvent) -> Self {
match event {
PostHogEvent::MultipartUploadComplete { duration } => {
let mut e = match event {
PostHogEvent::MultipartUploadComplete {
duration,
length,
size,
} => {
let mut e = posthog_rs::Event::new_anon("multipart_upload_complete");
e.insert_prop("duration", duration.as_secs())
.map_err(|err| error!("Error adding PostHog property: {err:?}"))
.ok();
e.insert_prop("length", length.as_secs())
.map_err(|err| error!("Error adding PostHog property: {err:?}"))
.ok();
e.insert_prop("size", size)
.map_err(|err| error!("Error adding PostHog property: {err:?}"))
.ok();
e
}
PostHogEvent::MultipartUploadFailed { duration, error } => {
Expand All @@ -27,7 +49,13 @@ impl From<PostHogEvent> for posthog_rs::Event {
.ok();
e
}
}
};

e.insert_prop("cap_version", env!("CARGO_PKG_VERSION"))
.map_err(|err| error!("Error adding PostHog property: {err:?}"))
.ok();

e
}
}

Expand Down
51 changes: 29 additions & 22 deletions apps/desktop/src-tauri/src/recording.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,14 +533,6 @@ pub async fn start_recording(
return Err("Video upload info not found".to_string());
};

let progressive_upload = InstantMultipartUpload::spawn(
app_handle,
recording_dir.join("content/output.mp4"),
video_upload_info.clone(),
Some(finish_upload_rx),
recording_dir.clone(),
);

let mut builder = instant_recording::Actor::builder(
recording_dir.clone(),
inputs.capture_target.clone(),
Expand All @@ -567,6 +559,14 @@ pub async fn start_recording(
e.to_string()
})?;

let progressive_upload = InstantMultipartUpload::spawn(
app_handle,
recording_dir.join("content/output.mp4"),
video_upload_info.clone(),
recording_dir.clone(),
Some(finish_upload_rx),
);

InProgressRecording::Instant {
handle,
progressive_upload,
Expand Down Expand Up @@ -734,25 +734,32 @@ pub async fn restart_recording(
pub async fn delete_recording(app: AppHandle, state: MutableState<'_, App>) -> Result<(), String> {
let recording_data = {
let mut app_state = state.write().await;
if let Some(recording) = app_state.clear_current_recording() {
let recording_dir = recording.recording_dir().clone();
let video_id = match &recording {
InProgressRecording::Instant {
video_upload_info, ..
} => Some(video_upload_info.id.clone()),
_ => None,
};
Some((recording, recording_dir, video_id))
} else {
None
}
app_state.clear_current_recording()
};

if let Some((_, recording_dir, video_id)) = recording_data {
if let Some(recording) = recording_data {
let recording_dir = recording.recording_dir().clone();
CurrentRecordingChanged.emit(&app).ok();
RecordingStopped {}.emit(&app).ok();

// let _ = recording.cancel().await;
let video_id = match &recording {
InProgressRecording::Instant {
video_upload_info,
progressive_upload,
..
} => {
debug!(
"User deleted recording. Aborting multipart upload for {:?}",
video_upload_info.id
);
progressive_upload.handle.abort();

Some(video_upload_info.id.clone())
}
_ => None,
};

let _ = recording.cancel().await;

std::fs::remove_dir_all(&recording_dir).ok();

Expand Down
79 changes: 56 additions & 23 deletions apps/desktop/src-tauri/src/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ use tokio::{
fs::File,
io::{AsyncReadExt, AsyncSeekExt, BufReader},
task::{self, JoinHandle},
time::{self, Instant},
time::{self, Instant, timeout},
};
use tokio_util::io::ReaderStream;
use tracing::{Span, debug, error, info, instrument, trace};
use tracing::{Span, debug, error, info, info_span, instrument, trace};
use tracing_futures::Instrument;

pub struct UploadedItem {
Expand Down Expand Up @@ -65,7 +65,6 @@ pub async fn upload_video(
meta: S3VideoMeta,
channel: Option<Channel<UploadProgress>>,
) -> Result<UploadedItem, AuthedApiError> {
println!("Uploading video {video_id}...");
info!("Uploading video {video_id}...");

let start = Instant::now();
Expand Down Expand Up @@ -103,9 +102,10 @@ pub async fn upload_video(
.map_err(|e| error!("Failed to get video metadata: {e}"))
.ok();

api::upload_multipart_complete(&app, &video_id, &upload_id, &parts, metadata).await?;
api::upload_multipart_complete(&app, &video_id, &upload_id, &parts, metadata.clone())
.await?;

Ok(())
Ok(metadata)
};

// TODO: We don't report progress on image upload
Expand All @@ -126,8 +126,15 @@ pub async fn upload_video(
tokio::join!(video_fut, thumbnail_fut);

async_capture_event(match &video_result {
Ok(()) => PostHogEvent::MultipartUploadComplete {
Ok(meta) => PostHogEvent::MultipartUploadComplete {
duration: start.elapsed(),
length: meta
.as_ref()
.map(|v| Duration::from_secs(v.duration_in_secs as u64))
.unwrap_or_default(),
size: std::fs::metadata(file_path)
.map(|m| ((m.len() as f64) / 1_000_000.0) as u64)
.unwrap_or_default(),
},
Err(err) => PostHogEvent::MultipartUploadFailed {
duration: start.elapsed(),
Expand Down Expand Up @@ -229,13 +236,21 @@ pub async fn create_or_get_video(
error: String,
}

if let Ok(error) = response.json::<CreateErrorResponse>().await {
let status = response.status();
let body = response.text().await;

if let Some(error) = body
.as_ref()
.ok()
.and_then(|body| serde_json::from_str::<CreateErrorResponse>(&*body).ok())
&& status == StatusCode::FORBIDDEN
{
if error.error == "upgrade_required" {
return Err(AuthedApiError::UpgradeRequired);
}
}

return Err("Unknown error uploading video".into());
return Err(format!("create_or_get_video/error/{status}: {body:?}").into());
}

let response_text = response
Expand Down Expand Up @@ -318,23 +333,30 @@ impl InstantMultipartUpload {
app: AppHandle,
file_path: PathBuf,
pre_created_video: VideoUploadInfo,
realtime_upload_done: Option<Receiver<()>>,
recording_dir: PathBuf,
realtime_upload_done: Option<Receiver<()>>,
) -> Self {
Self {
handle: spawn_actor(async move {
let start = Instant::now();
let result = Self::run(
app,
file_path,
file_path.clone(),
pre_created_video,
realtime_upload_done,
recording_dir,
realtime_upload_done,
)
.await;
async_capture_event(match &result {
Ok(()) => PostHogEvent::MultipartUploadComplete {
Ok(meta) => PostHogEvent::MultipartUploadComplete {
duration: start.elapsed(),
length: meta
.as_ref()
.map(|v| Duration::from_secs(v.duration_in_secs as u64))
.unwrap_or_default(),
size: std::fs::metadata(file_path)
.map(|m| ((m.len() as f64) / 1_000_000.0) as u64)
.unwrap_or_default(),
},
Err(err) => PostHogEvent::MultipartUploadFailed {
duration: start.elapsed(),
Expand All @@ -351,9 +373,9 @@ impl InstantMultipartUpload {
app: AppHandle,
file_path: PathBuf,
pre_created_video: VideoUploadInfo,
realtime_video_done: Option<Receiver<()>>,
recording_dir: PathBuf,
) -> Result<(), AuthedApiError> {
realtime_video_done: Option<Receiver<()>>,
) -> Result<Option<S3VideoMeta>, AuthedApiError> {
let video_id = pre_created_video.id.clone();
debug!("Initiating multipart upload for {video_id}...");

Expand Down Expand Up @@ -398,7 +420,8 @@ impl InstantMultipartUpload {
.map_err(|e| error!("Failed to get video metadata: {e}"))
.ok();

api::upload_multipart_complete(&app, &video_id, &upload_id, &parts, metadata).await?;
api::upload_multipart_complete(&app, &video_id, &upload_id, &parts, metadata.clone())
.await?;
info!("Multipart upload complete for {video_id}.");

let mut project_meta = RecordingMeta::load_for_project(&recording_dir).map_err(|err| {
Expand All @@ -411,7 +434,7 @@ impl InstantMultipartUpload {

let _ = app.clipboard().write_text(pre_created_video.link.clone());

Ok(())
Ok(metadata)
}
}

Expand Down Expand Up @@ -459,7 +482,17 @@ pub fn from_pending_file_to_chunks(
realtime_upload_done: Option<Receiver<()>>,
) -> impl Stream<Item = io::Result<Chunk>> {
try_stream! {
let mut file = tokio::fs::File::open(&path).await?;
let mut file = timeout(Duration::from_secs(20), async move {
loop {
if let Ok(file) = tokio::fs::File::open(&path).await.map_err(|err| error!("from_pending_file_to_chunks/open: {err:?}")) {
break file;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
})
.await
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Failed to open file. The recording pipeline may have crashed?"))?;

let mut part_number = 1;
let mut last_read_position: u64 = 0;
let mut realtime_is_done = realtime_upload_done.as_ref().map(|_| false);
Expand All @@ -473,10 +506,10 @@ pub fn from_pending_file_to_chunks(
match realtime_receiver.try_recv() {
Ok(_) => realtime_is_done = Some(true),
Err(flume::TryRecvError::Empty) => {},
Err(_) => yield Err(std::io::Error::new(
std::io::ErrorKind::Interrupted,
"Realtime generation failed"
))?,
// This means all senders where dropped.
// This can assume this means realtime is done.
// It possibly means something has gone wrong but that's not the uploader's problem.
Err(_) => realtime_is_done = Some(true),
}
}
}
Expand Down Expand Up @@ -646,14 +679,15 @@ fn multipart_uploader(
.map_err(|err| format!("uploader/part/{part_number}/client: {err:?}"))?
.put(&presigned_url)
.header("Content-Length", chunk.len())
.timeout(Duration::from_secs(120)).body(chunk);
.timeout(Duration::from_secs(5 * 60)).body(chunk);

if let Some(md5_sum) = &md5_sum {
req = req.header("Content-MD5", md5_sum);
}

let resp = req
.send()
.instrument(info_span!("send", size = size))
.await
.map_err(|err| format!("uploader/part/{part_number}/error: {err:?}"))?;

Expand Down Expand Up @@ -698,7 +732,6 @@ pub async fn singlepart_uploader(
.map_err(|err| format!("singlepart_uploader/client: {err:?}"))?
.put(&presigned_url)
.header("Content-Length", total_size)
.timeout(Duration::from_secs(120))
.body(reqwest::Body::wrap_stream(stream))
.send()
.await
Expand Down
Loading
Loading