-
Notifications
You must be signed in to change notification settings - Fork 5
agent: fail the job if process spawning fails #102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,7 +36,6 @@ mod shadow; | |
| mod upload; | ||
|
|
||
| use control::protocol::{FactoryInfo, PayloadReq, PayloadRes}; | ||
| use exec::ExitDetails; | ||
|
|
||
| struct Agent { | ||
| log: Logger, | ||
|
|
@@ -613,7 +612,7 @@ impl ClientWrap { | |
| &self, | ||
| name: &str, | ||
| script: &str, | ||
| ) -> Option<mpsc::Receiver<exec::Activity>> { | ||
| ) -> Result<mpsc::Receiver<exec::Activity>> { | ||
| self.append_worker_msg( | ||
| name, | ||
| &format!("starting {name}-job diagnostics"), | ||
|
|
@@ -626,8 +625,7 @@ impl ClientWrap { | |
| let s = match write_script(script) { | ||
| Ok(s) => s, | ||
| Err(e) => { | ||
| error!(self.log, "writing {name}-job diagnostic script: {e}"); | ||
| return None; | ||
| bail!("writing {name}-job diagnostic script: {e}"); | ||
| } | ||
| }; | ||
|
|
||
|
|
@@ -659,7 +657,7 @@ impl ClientWrap { | |
| cmd.gid(0); | ||
|
|
||
| match exec::run_diagnostic(cmd, name) { | ||
| Ok(c) => Some(c), | ||
| Ok(c) => Ok(c), | ||
| Err(e) => { | ||
| /* | ||
| * Try to post the error we would have reported to the server, | ||
|
|
@@ -670,13 +668,13 @@ impl ClientWrap { | |
| .body(vec![WorkerAppend { | ||
| stream: "agent".into(), | ||
| time: Utc::now(), | ||
| payload: format!("ERROR: diag.{name} exec: {:?}", e), | ||
| payload: format!("ERROR: diag.{name} exec: {e:?}"), | ||
| }]) | ||
| .send() | ||
| .await | ||
| .ok(); | ||
|
|
||
| None | ||
| bail!("failed to exec diag {name}: {e:?}") | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -1208,8 +1206,7 @@ async fn cmd_run(mut l: Level<Agent>) -> Result<()> { | |
|
|
||
| let mut tasks: VecDeque<WorkerPingTask> = VecDeque::new(); | ||
| let mut stage = Stage::Ready; | ||
| let mut exit_details: Vec<ExitDetails> = Vec::new(); | ||
| let mut upload_errors = false; | ||
| let mut job_failed = false; | ||
|
|
||
| let mut pingfreq = tokio::time::interval(Duration::from_secs(5)); | ||
| pingfreq.set_missed_tick_behavior(MissedTickBehavior::Skip); | ||
|
|
@@ -1482,12 +1479,17 @@ async fn cmd_run(mut l: Level<Agent>) -> Result<()> { | |
| continue; | ||
| } | ||
| Stage::StartPreDiagnostics(script) => { | ||
| if let Some(c) = | ||
| cw.start_diag_script("pre", script.as_str()).await | ||
| { | ||
| stage = Stage::PreDiagnostics(c, None); | ||
| } else { | ||
| stage = Stage::Ready; | ||
| match cw.start_diag_script("pre", script.as_str()).await { | ||
| Ok(c) => stage = Stage::PreDiagnostics(c, None), | ||
| Err(e) => { | ||
| error!(log, "pre-diagnostics start failed: {e:?}"); | ||
| cw.report_failure(Some( | ||
| "failed to start pre-diagnostics", | ||
| )) | ||
| .await; | ||
| job_failed = true; | ||
| stage = Stage::Broken; | ||
| } | ||
| } | ||
| } | ||
| Stage::PreDiagnostics(ch, failed) => { | ||
|
|
@@ -1512,7 +1514,9 @@ async fn cmd_run(mut l: Level<Agent>) -> Result<()> { | |
| * mark the worker as failed. | ||
| */ | ||
| *failed = Some(ex.failed()); | ||
| exit_details.push(ex); | ||
| if ex.failed() { | ||
| job_failed = true; | ||
| } | ||
| } | ||
| Some(exec::Activity::Complete) if failed.unwrap() => { | ||
| /* | ||
|
|
@@ -1547,12 +1551,11 @@ async fn cmd_run(mut l: Level<Agent>) -> Result<()> { | |
| * still attempt to upload output files before we complete | ||
| * the job. | ||
| */ | ||
| let failures = exit_details.iter().any(|ex| ex.failed()); | ||
| if failures { | ||
| if job_failed { | ||
| error!(log, "aborting after failed task"); | ||
| } | ||
|
|
||
| if failures || tasks.is_empty() { | ||
| if job_failed || tasks.is_empty() { | ||
| /* | ||
| * There are no more tasks to complete, so move on to | ||
| * uploading outputs. | ||
|
|
@@ -1635,6 +1638,8 @@ async fn cmd_run(mut l: Level<Agent>) -> Result<()> { | |
| stage = Stage::Child(c, t, None); | ||
| } | ||
| Err(e) => { | ||
| job_failed = true; | ||
|
|
||
| /* | ||
| * Try to post the error we would have reported | ||
| * to the server, but don't try too hard. | ||
|
|
@@ -1681,7 +1686,9 @@ async fn cmd_run(mut l: Level<Agent>) -> Result<()> { | |
| * to execute any subsequent tasks. | ||
| */ | ||
| *failed = Some(ex.failed()); | ||
| exit_details.push(ex); | ||
| if ex.failed() { | ||
| job_failed = true; | ||
| } | ||
| } | ||
| Some(exec::Activity::Complete) => { | ||
| /* | ||
|
|
@@ -1776,14 +1783,12 @@ async fn cmd_run(mut l: Level<Agent>) -> Result<()> { | |
| .await; | ||
| } | ||
| Some(upload::Activity::Complete) => { | ||
| let failed = upload_errors | ||
| || exit_details.iter().any(|ex| ex.failed()); | ||
| cw.job_complete(failed).await; | ||
| cw.job_complete(job_failed).await; | ||
| stage = Stage::StartPostDiagnostics; | ||
| } | ||
| Some(upload::Activity::Error(s)) => { | ||
| cw.append_msg(&format!("upload error: {}", s)).await; | ||
| upload_errors = true; | ||
| job_failed = true; | ||
| } | ||
| Some(upload::Activity::Warning(s)) => { | ||
| cw.append_msg(&format!("upload warning: {}", s)).await; | ||
|
|
@@ -1801,12 +1806,14 @@ async fn cmd_run(mut l: Level<Agent>) -> Result<()> { | |
| * The factory gave us a post-diagnostic script to run | ||
| * and the server will wait for us to run it. | ||
| */ | ||
| if let Some(c) = cw.start_diag_script("post", script).await | ||
| { | ||
| stage = Stage::PostDiagnostics(c, None); | ||
| } else { | ||
| cw.diagnostics_complete(false).await; | ||
| stage = Stage::Complete; | ||
|
Comment on lines
-1804
to
-1809
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think not failing here if the post diagnostics couldn't get underway was definitely more of a deliberate choice, FWIW, as it means a job that had succeeded might then fail for what didn't seem like great reasons, etc. It's probably fine to tighten up, though, and we'll see how it goes! |
||
| match cw.start_diag_script("post", script).await { | ||
| Ok(c) => stage = Stage::PostDiagnostics(c, None), | ||
| Err(e) => { | ||
| error!(log, "post-diagnostics start failed: {e:?}"); | ||
| cw.diagnostics_complete(false).await; | ||
| job_failed = true; | ||
| stage = Stage::Complete; | ||
| } | ||
| } | ||
| } else { | ||
| stage = Stage::Complete; | ||
|
|
@@ -1835,7 +1842,9 @@ async fn cmd_run(mut l: Level<Agent>) -> Result<()> { | |
| * mark the worker as held. | ||
| */ | ||
| *failed = Some(ex.failed()); | ||
| exit_details.push(ex); | ||
| if ex.failed() { | ||
| job_failed = true; | ||
| } | ||
| } | ||
| Some(exec::Activity::Complete) => { | ||
| /* | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this behavior was wrong, and that we shouldn't mark the worker as ready if the pre-diagnostics script fails to execute.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was, I believe, a deliberate choice. The pre-diagnostic scripts were something I added while debugging various bits of AWS-induced malaise, but I was generally leaning to failing open in this case if we weren't able to get it started.
It seems fine to tighten it up, though; we'll just need to be careful not to misconfigure the pre-diag stuff in such a way that we hit this condition.