Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 40 additions & 31 deletions agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ mod shadow;
mod upload;

use control::protocol::{FactoryInfo, PayloadReq, PayloadRes};
use exec::ExitDetails;

struct Agent {
log: Logger,
Expand Down Expand Up @@ -613,7 +612,7 @@ impl ClientWrap {
&self,
name: &str,
script: &str,
) -> Option<mpsc::Receiver<exec::Activity>> {
) -> Result<mpsc::Receiver<exec::Activity>> {
self.append_worker_msg(
name,
&format!("starting {name}-job diagnostics"),
Expand All @@ -626,8 +625,7 @@ impl ClientWrap {
let s = match write_script(script) {
Ok(s) => s,
Err(e) => {
error!(self.log, "writing {name}-job diagnostic script: {e}");
return None;
bail!("writing {name}-job diagnostic script: {e}");
}
};

Expand Down Expand Up @@ -659,7 +657,7 @@ impl ClientWrap {
cmd.gid(0);

match exec::run_diagnostic(cmd, name) {
Ok(c) => Some(c),
Ok(c) => Ok(c),
Err(e) => {
/*
* Try to post the error we would have reported to the server,
Expand All @@ -670,13 +668,13 @@ impl ClientWrap {
.body(vec![WorkerAppend {
stream: "agent".into(),
time: Utc::now(),
payload: format!("ERROR: diag.{name} exec: {:?}", e),
payload: format!("ERROR: diag.{name} exec: {e:?}"),
}])
.send()
.await
.ok();

None
bail!("failed to exec diag {name}: {e:?}")
}
}
}
Expand Down Expand Up @@ -1208,8 +1206,7 @@ async fn cmd_run(mut l: Level<Agent>) -> Result<()> {

let mut tasks: VecDeque<WorkerPingTask> = VecDeque::new();
let mut stage = Stage::Ready;
let mut exit_details: Vec<ExitDetails> = Vec::new();
let mut upload_errors = false;
let mut job_failed = false;

let mut pingfreq = tokio::time::interval(Duration::from_secs(5));
pingfreq.set_missed_tick_behavior(MissedTickBehavior::Skip);
Expand Down Expand Up @@ -1482,12 +1479,17 @@ async fn cmd_run(mut l: Level<Agent>) -> Result<()> {
continue;
}
Stage::StartPreDiagnostics(script) => {
if let Some(c) =
cw.start_diag_script("pre", script.as_str()).await
{
stage = Stage::PreDiagnostics(c, None);
} else {
stage = Stage::Ready;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this behavior was wrong, and that we shouldn't mark the worker as ready if the pre-diagnostics script fails to execute.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was, I believe, a deliberate choice. The pre-diagnostic scripts were something I added while debugging various bits of AWS-induced malaise, but I was generally leaning to failing open in this case if we weren't able to get it started.

It seems fine to tighten it up, though; we'll just need to be careful not to misconfigure the pre-diag stuff in such a way that we hit this condition.

match cw.start_diag_script("pre", script.as_str()).await {
Ok(c) => stage = Stage::PreDiagnostics(c, None),
Err(e) => {
error!(log, "pre-diagnostics start failed: {e:?}");
cw.report_failure(Some(
"failed to start pre-diagnostics",
))
.await;
job_failed = true;
stage = Stage::Broken;
}
}
}
Stage::PreDiagnostics(ch, failed) => {
Expand All @@ -1512,7 +1514,9 @@ async fn cmd_run(mut l: Level<Agent>) -> Result<()> {
* mark the worker as failed.
*/
*failed = Some(ex.failed());
exit_details.push(ex);
if ex.failed() {
job_failed = true;
}
}
Some(exec::Activity::Complete) if failed.unwrap() => {
/*
Expand Down Expand Up @@ -1547,12 +1551,11 @@ async fn cmd_run(mut l: Level<Agent>) -> Result<()> {
* still attempt to upload output files before we complete
* the job.
*/
let failures = exit_details.iter().any(|ex| ex.failed());
if failures {
if job_failed {
error!(log, "aborting after failed task");
}

if failures || tasks.is_empty() {
if job_failed || tasks.is_empty() {
/*
* There are no more tasks to complete, so move on to
* uploading outputs.
Expand Down Expand Up @@ -1635,6 +1638,8 @@ async fn cmd_run(mut l: Level<Agent>) -> Result<()> {
stage = Stage::Child(c, t, None);
}
Err(e) => {
job_failed = true;

/*
* Try to post the error we would have reported
* to the server, but don't try too hard.
Expand Down Expand Up @@ -1681,7 +1686,9 @@ async fn cmd_run(mut l: Level<Agent>) -> Result<()> {
* to execute any subsequent tasks.
*/
*failed = Some(ex.failed());
exit_details.push(ex);
if ex.failed() {
job_failed = true;
}
}
Some(exec::Activity::Complete) => {
/*
Expand Down Expand Up @@ -1776,14 +1783,12 @@ async fn cmd_run(mut l: Level<Agent>) -> Result<()> {
.await;
}
Some(upload::Activity::Complete) => {
let failed = upload_errors
|| exit_details.iter().any(|ex| ex.failed());
cw.job_complete(failed).await;
cw.job_complete(job_failed).await;
stage = Stage::StartPostDiagnostics;
}
Some(upload::Activity::Error(s)) => {
cw.append_msg(&format!("upload error: {}", s)).await;
upload_errors = true;
job_failed = true;
}
Some(upload::Activity::Warning(s)) => {
cw.append_msg(&format!("upload warning: {}", s)).await;
Expand All @@ -1801,12 +1806,14 @@ async fn cmd_run(mut l: Level<Agent>) -> Result<()> {
* The factory gave us a post-diagnostic script to run
* and the server will wait for us to run it.
*/
if let Some(c) = cw.start_diag_script("post", script).await
{
stage = Stage::PostDiagnostics(c, None);
} else {
cw.diagnostics_complete(false).await;
stage = Stage::Complete;
Comment on lines -1804 to -1809
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not failing here if the post diagnostics couldn't get underway was definitely more of a deliberate choice, FWIW, as it means a job that had succeeded might then fail for what didn't seem like great reasons, etc. It's probably fine to tighten up, though, and we'll see how it goes!

match cw.start_diag_script("post", script).await {
Ok(c) => stage = Stage::PostDiagnostics(c, None),
Err(e) => {
error!(log, "post-diagnostics start failed: {e:?}");
cw.diagnostics_complete(false).await;
job_failed = true;
stage = Stage::Complete;
}
}
} else {
stage = Stage::Complete;
Expand Down Expand Up @@ -1835,7 +1842,9 @@ async fn cmd_run(mut l: Level<Agent>) -> Result<()> {
* mark the worker as held.
*/
*failed = Some(ex.failed());
exit_details.push(ex);
if ex.failed() {
job_failed = true;
}
}
Some(exec::Activity::Complete) => {
/*
Expand Down