Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ version = "0.0.0"
[workspace.lints.clippy]
identity_op = "allow"
many_single_char_names = "allow"
should_implement_trait = "allow"
too_many_arguments = "allow"
type_complexity = "allow"
vec_init_then_push = "allow"
Expand Down
158 changes: 79 additions & 79 deletions agent/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ use std::time::{Duration, Instant};
use tokio::sync::mpsc::{channel, Receiver, Sender};

use anyhow::{anyhow, bail, Result};
use buildomat_common::JobStream;
use chrono::prelude::*;

use super::OutputRecord;

fn spawn_reader<T>(
tx: Sender<Activity>,
name: String,
name: JobStream,
stream: Option<T>,
) -> Option<std::thread::JoinHandle<()>>
where
Expand Down Expand Up @@ -47,7 +48,10 @@ where
let s = String::from_utf8_lossy(&buf);

if tx
.blocking_send(Activity::msg(&name, s.trim_end()))
.blocking_send(Activity::msg(
name.clone(),
s.trim_end(),
))
.is_err()
{
/*
Expand All @@ -63,7 +67,7 @@ where
* server, but don't panic if we cannot.
*/
tx.blocking_send(Activity::msg(
"error",
JobStream::Error,
Comment on lines -66 to +70
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only occurrence of error as a job stream, and nothing else in the codebase handled it. Should this be panic instead?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lack of handling in other places is likely just because I've never actually seen it happen in practice. It definitely should not be panic, though; that's for, e.g., kernel panic stacks that occur while your job is running.

I believe we would just have printed with generic debug-ish formatting if it had showed up in a stream, at any rate.

&format!("failed to read {name}: {e:?}"),
))
.ok();
Expand All @@ -76,7 +80,7 @@ where

#[derive(Debug)]
pub struct ExitDetails {
stream: String,
stream: JobStream,
duration_ms: u64,
when: DateTime<Utc>,
code: i32,
Expand All @@ -85,7 +89,7 @@ pub struct ExitDetails {
impl ExitDetails {
pub(crate) fn to_record(&self) -> OutputRecord {
OutputRecord {
stream: self.stream.to_string(),
stream: self.stream.clone(),
msg: format!(
"process exited: duration {} ms, exit code {}",
self.duration_ms, self.code
Expand All @@ -101,15 +105,15 @@ impl ExitDetails {

#[derive(Clone, Debug)]
pub struct OutputDetails {
stream: String,
stream: JobStream,
msg: String,
time: DateTime<Utc>,
}

impl OutputDetails {
pub(crate) fn to_record(&self) -> OutputRecord {
OutputRecord {
stream: self.stream.to_string(),
stream: self.stream.clone(),
msg: self.msg.to_string(),
time: self.time,
}
Expand All @@ -123,69 +127,94 @@ pub enum Activity {
Complete,
}

struct ActivityBuilder {
error_stream: String,
exit_stream: String,
bgproc: Option<String>,
#[derive(Clone)]
pub enum ActivityBuilder {
Task,
Diag(String),
Bg(String),
}

impl ActivityBuilder {
fn exit(&self, start: &Instant, end: &Instant, code: i32) -> Activity {
Activity::Exit(ExitDetails {
stream: self.exit_stream.to_string(),
duration_ms: end.duration_since(*start).as_millis() as u64,
when: Utc::now(),
code,
})
fn stdout_stream(&self) -> JobStream {
match self.clone() {
ActivityBuilder::Task => JobStream::Stdout,
ActivityBuilder::Diag(_) => JobStream::Stdout,
ActivityBuilder::Bg(name) => JobStream::BgStdout { name },
}
}

fn stderr_stream(&self) -> JobStream {
match self.clone() {
ActivityBuilder::Task => JobStream::Stderr,
ActivityBuilder::Diag(_) => JobStream::Stderr,
ActivityBuilder::Bg(name) => JobStream::BgStderr { name },
}
}

fn stdout_stream(&self) -> String {
if let Some(n) = &self.bgproc {
format!("bg.{n}.stdout")
} else {
"stdout".to_string()
fn exit_stream(&self) -> JobStream {
match self.clone() {
ActivityBuilder::Task => JobStream::Task,
ActivityBuilder::Diag(name) => JobStream::Diag { name },
ActivityBuilder::Bg(name) => JobStream::Bg { name },
}
}

fn stderr_stream(&self) -> String {
if let Some(n) = &self.bgproc {
format!("bg.{n}.stderr")
} else {
"stderr".to_string()
fn error_stream(&self) -> JobStream {
match self.clone() {
ActivityBuilder::Task => JobStream::Worker,
ActivityBuilder::Diag(name) => JobStream::Diag { name },
ActivityBuilder::Bg(name) => JobStream::Bg { name },
}
}

fn is_bg(&self) -> bool {
matches!(self, ActivityBuilder::Bg(_))
}

fn errmsg(&self, pfx: &str, msg: &str) -> String {
let mut s = format!("{pfx}: ");
if let Some(bg) = &self.bgproc {
s += &format!("background process {bg:?}: ");
match self {
ActivityBuilder::Task => {}
ActivityBuilder::Diag(_) => {}
ActivityBuilder::Bg(name) => {
s += &format!("background process {name:?}: ")
}
}
s += ": ";
s += msg;
s
}

fn exit(&self, start: &Instant, end: &Instant, code: i32) -> Activity {
Activity::Exit(ExitDetails {
stream: self.exit_stream(),
duration_ms: end.duration_since(*start).as_millis() as u64,
when: Utc::now(),
code,
})
}

fn err(&self, msg: &str) -> Activity {
Activity::Output(OutputDetails {
stream: self.error_stream.to_string(),
stream: self.error_stream(),
msg: self.errmsg("exec error", msg),
time: Utc::now(),
})
}

fn warn(&self, msg: &str) -> Activity {
Activity::Output(OutputDetails {
stream: self.error_stream.to_string(),
stream: self.error_stream(),
msg: self.errmsg("exec warning", msg),
time: Utc::now(),
})
}
}

impl Activity {
fn msg(stream: &str, msg: &str) -> Activity {
fn msg(stream: JobStream, msg: &str) -> Activity {
Activity::Output(OutputDetails {
stream: stream.to_string(),
stream,
msg: msg.to_string(),
time: Utc::now(),
})
Expand Down Expand Up @@ -229,39 +258,13 @@ pub fn thread_done(
}
}

pub fn run_diagnostic(cmd: Command, name: &str) -> Result<Receiver<Activity>> {
pub fn run(cmd: Command, ab: ActivityBuilder) -> Result<Receiver<Activity>> {
let (tx, rx) = channel::<Activity>(100);

run_common(
cmd,
ActivityBuilder {
error_stream: format!("diag.{name}"),
exit_stream: format!("diag.{name}"),
bgproc: None,
},
tx,
)?;

Ok(rx)
}

pub fn run(cmd: Command) -> Result<Receiver<Activity>> {
let (tx, rx) = channel::<Activity>(100);

run_common(
cmd,
ActivityBuilder {
error_stream: "worker".to_string(),
exit_stream: "task".to_string(),
bgproc: None,
},
tx,
)?;

run_inner(cmd, ab, tx)?;
Ok(rx)
}

fn run_common(
fn run_inner(
mut cmd: Command,
ab: ActivityBuilder,
tx: Sender<Activity>,
Expand Down Expand Up @@ -289,14 +292,15 @@ fn run_common(
)
.unwrap();

/*
* Only send an exit notification if this is the primary task
* process.
*/
if ab.bgproc.is_none() {
tx.blocking_send(ab.exit(&start, &end, i32::MAX)).unwrap();
if ab.is_bg() {
/*
* No further notifications are required for background
* processes.
*/
return;
}

tx.blocking_send(ab.exit(&start, &end, i32::MAX)).unwrap();
false
}
Ok(es) => {
Expand All @@ -312,11 +316,12 @@ fn run_common(
let stdio_warning = !thread_done(&mut readout, "stdout", until)
| !thread_done(&mut readerr, "stderr", until);

if ab.bgproc.is_some() {
if ab.is_bg() {
/*
* No further notifications are required for background
* processes.
*/
return;
}

if let Some(sig) = es.signal() {
Expand All @@ -331,7 +336,7 @@ fn run_common(
}
};

assert!(ab.bgproc.is_none());
assert!(!ab.is_bg());

if stdio_warning {
tx.blocking_send(ab.warn(
Expand Down Expand Up @@ -447,13 +452,9 @@ impl BackgroundProcesses {
c.uid(uid);
c.gid(gid);

let pid = run_common(
let pid = run_inner(
c,
ActivityBuilder {
error_stream: format!("bg.{name}"),
exit_stream: format!("bg.{name}"),
bgproc: Some(name.to_string()),
},
ActivityBuilder::Bg(name.to_string()),
self.tx.clone(),
)
.map_err(|e| anyhow!("starting background process {name:?}: {e}"))?;
Expand Down Expand Up @@ -520,8 +521,7 @@ impl BackgroundProcesses {
self.rx.close();
while let Some(a) = self.rx.recv().await {
if let Activity::Output(o) = &a {
if o.stream.ends_with("stdout") || o.stream.ends_with("stderr")
{
if o.stream.is_output() {
lastwords.push(a);
}
}
Expand Down
Loading