diff --git a/agent/src/main.rs b/agent/src/main.rs index 71495870..3d677fc7 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -36,7 +36,6 @@ mod shadow; mod upload; use control::protocol::{FactoryInfo, PayloadReq, PayloadRes}; -use exec::ExitDetails; struct Agent { log: Logger, @@ -613,7 +612,7 @@ impl ClientWrap { &self, name: &str, script: &str, - ) -> Option> { + ) -> Result> { self.append_worker_msg( name, &format!("starting {name}-job diagnostics"), @@ -626,8 +625,7 @@ impl ClientWrap { let s = match write_script(script) { Ok(s) => s, Err(e) => { - error!(self.log, "writing {name}-job diagnostic script: {e}"); - return None; + bail!("writing {name}-job diagnostic script: {e}"); } }; @@ -659,7 +657,7 @@ impl ClientWrap { cmd.gid(0); match exec::run_diagnostic(cmd, name) { - Ok(c) => Some(c), + Ok(c) => Ok(c), Err(e) => { /* * Try to post the error we would have reported to the server, @@ -670,13 +668,13 @@ impl ClientWrap { .body(vec![WorkerAppend { stream: "agent".into(), time: Utc::now(), - payload: format!("ERROR: diag.{name} exec: {:?}", e), + payload: format!("ERROR: diag.{name} exec: {e:?}"), }]) .send() .await .ok(); - None + bail!("failed to exec diag {name}: {e:?}") } } } @@ -1208,8 +1206,7 @@ async fn cmd_run(mut l: Level) -> Result<()> { let mut tasks: VecDeque = VecDeque::new(); let mut stage = Stage::Ready; - let mut exit_details: Vec = Vec::new(); - let mut upload_errors = false; + let mut job_failed = false; let mut pingfreq = tokio::time::interval(Duration::from_secs(5)); pingfreq.set_missed_tick_behavior(MissedTickBehavior::Skip); @@ -1482,12 +1479,17 @@ async fn cmd_run(mut l: Level) -> Result<()> { continue; } Stage::StartPreDiagnostics(script) => { - if let Some(c) = - cw.start_diag_script("pre", script.as_str()).await - { - stage = Stage::PreDiagnostics(c, None); - } else { - stage = Stage::Ready; + match cw.start_diag_script("pre", script.as_str()).await { + Ok(c) => stage = Stage::PreDiagnostics(c, None), + Err(e) => { + error!(log, "pre-diagnostics start failed: {e:?}"); + cw.report_failure(Some( + "failed to start pre-diagnostics", + )) + .await; + job_failed = true; + stage = Stage::Broken; + } } } Stage::PreDiagnostics(ch, failed) => { @@ -1512,7 +1514,9 @@ async fn cmd_run(mut l: Level) -> Result<()> { * mark the worker as failed. */ *failed = Some(ex.failed()); - exit_details.push(ex); + if ex.failed() { + job_failed = true; + } } Some(exec::Activity::Complete) if failed.unwrap() => { /* @@ -1547,12 +1551,11 @@ async fn cmd_run(mut l: Level) -> Result<()> { * still attempt to upload output files before we complete * the job. */ - let failures = exit_details.iter().any(|ex| ex.failed()); - if failures { + if job_failed { error!(log, "aborting after failed task"); } - if failures || tasks.is_empty() { + if job_failed || tasks.is_empty() { /* * There are no more tasks to complete, so move on to * uploading outputs. @@ -1635,6 +1638,8 @@ async fn cmd_run(mut l: Level) -> Result<()> { stage = Stage::Child(c, t, None); } Err(e) => { + job_failed = true; + /* * Try to post the error we would have reported * to the server, but don't try too hard. @@ -1681,7 +1686,9 @@ async fn cmd_run(mut l: Level) -> Result<()> { * to execute any subsequent tasks. */ *failed = Some(ex.failed()); - exit_details.push(ex); + if ex.failed() { + job_failed = true; + } } Some(exec::Activity::Complete) => { /* @@ -1776,14 +1783,12 @@ async fn cmd_run(mut l: Level) -> Result<()> { .await; } Some(upload::Activity::Complete) => { - let failed = upload_errors - || exit_details.iter().any(|ex| ex.failed()); - cw.job_complete(failed).await; + cw.job_complete(job_failed).await; stage = Stage::StartPostDiagnostics; } Some(upload::Activity::Error(s)) => { cw.append_msg(&format!("upload error: {}", s)).await; - upload_errors = true; + job_failed = true; } Some(upload::Activity::Warning(s)) => { cw.append_msg(&format!("upload warning: {}", s)).await; @@ -1801,12 +1806,14 @@ async fn cmd_run(mut l: Level) -> Result<()> { * The factory gave us a post-diagnostic script to run * and the server will wait for us to run it. */ - if let Some(c) = cw.start_diag_script("post", script).await - { - stage = Stage::PostDiagnostics(c, None); - } else { - cw.diagnostics_complete(false).await; - stage = Stage::Complete; + match cw.start_diag_script("post", script).await { + Ok(c) => stage = Stage::PostDiagnostics(c, None), + Err(e) => { + error!(log, "post-diagnostics start failed: {e:?}"); + cw.diagnostics_complete(false).await; + job_failed = true; + stage = Stage::Complete; + } } } else { stage = Stage::Complete; @@ -1835,7 +1842,9 @@ async fn cmd_run(mut l: Level) -> Result<()> { * mark the worker as held. */ *failed = Some(ex.failed()); - exit_details.push(ex); + if ex.failed() { + job_failed = true; + } } Some(exec::Activity::Complete) => { /*