From 231eb9eb33b929ec20f2453f3a397e44e98143f3 Mon Sep 17 00:00:00 2001 From: Emily Albini Date: Fri, 17 Apr 2026 09:32:18 +0200 Subject: [PATCH 1/7] agent: fix missing return --- agent/src/exec.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/agent/src/exec.rs b/agent/src/exec.rs index d8e2aa5a..bcdf320c 100644 --- a/agent/src/exec.rs +++ b/agent/src/exec.rs @@ -289,14 +289,15 @@ fn run_common( ) .unwrap(); - /* - * Only send an exit notification if this is the primary task - * process. - */ - if ab.bgproc.is_none() { - tx.blocking_send(ab.exit(&start, &end, i32::MAX)).unwrap(); + if ab.bgproc.is_some() { + /* + * No further notifications are required for background + * processes. + */ + return; } + tx.blocking_send(ab.exit(&start, &end, i32::MAX)).unwrap(); false } Ok(es) => { @@ -317,6 +318,7 @@ fn run_common( * No further notifications are required for background * processes. */ + return; } if let Some(sig) = es.signal() { From 2683833521217054d950e52596e1f47d3252ee5a Mon Sep 17 00:00:00 2001 From: Emily Albini Date: Tue, 14 Apr 2026 21:43:30 +0200 Subject: [PATCH 2/7] use an enum to define job streams --- Cargo.toml | 1 + agent/src/exec.rs | 146 ++++++++++++++--------------- agent/src/main.rs | 36 +++---- bin/src/main.rs | 44 ++++----- common/src/job_streams.rs | 87 +++++++++++++++++ common/src/lib.rs | 4 + github/server/src/variety/basic.rs | 133 +++++++++++++------------- jobsh/src/lib.rs | 37 ++++---- 8 files changed, 288 insertions(+), 200 deletions(-) create mode 100644 common/src/job_streams.rs diff --git a/Cargo.toml b/Cargo.toml index e46049a2..35bd8401 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ version = "0.0.0" [workspace.lints.clippy] identity_op = "allow" many_single_char_names = "allow" +should_implement_trait = "allow" too_many_arguments = "allow" type_complexity = "allow" vec_init_then_push = "allow" diff --git a/agent/src/exec.rs b/agent/src/exec.rs index bcdf320c..282426cd 100644 --- a/agent/src/exec.rs +++ b/agent/src/exec.rs @@ -11,13 +11,14 @@ use std::time::{Duration, Instant}; use tokio::sync::mpsc::{channel, Receiver, Sender}; use anyhow::{anyhow, bail, Result}; +use buildomat_common::JobStream; use chrono::prelude::*; use super::OutputRecord; fn spawn_reader( tx: Sender, - name: String, + name: JobStream, stream: Option, ) -> Option> where @@ -47,7 +48,10 @@ where let s = String::from_utf8_lossy(&buf); if tx - .blocking_send(Activity::msg(&name, s.trim_end())) + .blocking_send(Activity::msg( + name.clone(), + s.trim_end(), + )) .is_err() { /* @@ -63,7 +67,7 @@ where * server, but don't panic if we cannot. */ tx.blocking_send(Activity::msg( - "error", + JobStream::Error, &format!("failed to read {name}: {e:?}"), )) .ok(); @@ -76,7 +80,7 @@ where #[derive(Debug)] pub struct ExitDetails { - stream: String, + stream: JobStream, duration_ms: u64, when: DateTime, code: i32, @@ -85,7 +89,7 @@ pub struct ExitDetails { impl ExitDetails { pub(crate) fn to_record(&self) -> OutputRecord { OutputRecord { - stream: self.stream.to_string(), + stream: self.stream.clone(), msg: format!( "process exited: duration {} ms, exit code {}", self.duration_ms, self.code @@ -101,7 +105,7 @@ impl ExitDetails { #[derive(Clone, Debug)] pub struct OutputDetails { - stream: String, + stream: JobStream, msg: String, time: DateTime, } @@ -109,7 +113,7 @@ pub struct OutputDetails { impl OutputDetails { pub(crate) fn to_record(&self) -> OutputRecord { OutputRecord { - stream: self.stream.to_string(), + stream: self.stream.clone(), msg: self.msg.to_string(), time: self.time, } @@ -123,51 +127,76 @@ pub enum Activity { Complete, } -struct ActivityBuilder { - error_stream: String, - exit_stream: String, - bgproc: Option, +#[derive(Clone)] +pub enum ActivityBuilder { + Task, + Diag(String), + Bg(String), } impl ActivityBuilder { - fn exit(&self, start: &Instant, end: &Instant, code: i32) -> Activity { - Activity::Exit(ExitDetails { - stream: self.exit_stream.to_string(), - duration_ms: end.duration_since(*start).as_millis() as u64, - when: Utc::now(), - code, - }) + fn stdout_stream(&self) -> JobStream { + match self.clone() { + ActivityBuilder::Task => JobStream::Stdout, + ActivityBuilder::Diag(_) => JobStream::Stdout, + ActivityBuilder::Bg(name) => JobStream::BgStdout { name }, + } } - fn stdout_stream(&self) -> String { - if let Some(n) = &self.bgproc { - format!("bg.{n}.stdout") - } else { - "stdout".to_string() + fn stderr_stream(&self) -> JobStream { + match self.clone() { + ActivityBuilder::Task => JobStream::Stderr, + ActivityBuilder::Diag(_) => JobStream::Stderr, + ActivityBuilder::Bg(name) => JobStream::BgStderr { name }, } } - fn stderr_stream(&self) -> String { - if let Some(n) = &self.bgproc { - format!("bg.{n}.stderr") - } else { - "stderr".to_string() + fn exit_stream(&self) -> JobStream { + match self.clone() { + ActivityBuilder::Task => JobStream::Task, + ActivityBuilder::Diag(name) => JobStream::Diag { name }, + ActivityBuilder::Bg(name) => JobStream::Bg { name }, } } + fn error_stream(&self) -> JobStream { + match self.clone() { + ActivityBuilder::Task => JobStream::Worker, + ActivityBuilder::Diag(name) => JobStream::Diag { name }, + ActivityBuilder::Bg(name) => JobStream::Bg { name }, + } + } + + fn is_bg(&self) -> bool { + matches!(self, ActivityBuilder::Bg(_)) + } + fn errmsg(&self, pfx: &str, msg: &str) -> String { let mut s = format!("{pfx}: "); - if let Some(bg) = &self.bgproc { - s += &format!("background process {bg:?}: "); + match self { + ActivityBuilder::Task => {} + ActivityBuilder::Diag(_) => {} + ActivityBuilder::Bg(name) => { + s += &format!("background process {name:?}: ") + } } s += ": "; s += msg; s } + fn exit(&self, start: &Instant, end: &Instant, code: i32) -> Activity { + Activity::Exit(ExitDetails { + stream: self.exit_stream(), + duration_ms: end.duration_since(*start).as_millis() as u64, + when: Utc::now(), + code, + }) + } + fn err(&self, msg: &str) -> Activity { Activity::Output(OutputDetails { - stream: self.error_stream.to_string(), + stream: self.error_stream(), msg: self.errmsg("exec error", msg), time: Utc::now(), }) @@ -175,7 +204,7 @@ impl ActivityBuilder { fn warn(&self, msg: &str) -> Activity { Activity::Output(OutputDetails { - stream: self.error_stream.to_string(), + stream: self.error_stream(), msg: self.errmsg("exec warning", msg), time: Utc::now(), }) @@ -183,9 +212,9 @@ impl ActivityBuilder { } impl Activity { - fn msg(stream: &str, msg: &str) -> Activity { + fn msg(stream: JobStream, msg: &str) -> Activity { Activity::Output(OutputDetails { - stream: stream.to_string(), + stream, msg: msg.to_string(), time: Utc::now(), }) @@ -229,39 +258,13 @@ pub fn thread_done( } } -pub fn run_diagnostic(cmd: Command, name: &str) -> Result> { - let (tx, rx) = channel::(100); - - run_common( - cmd, - ActivityBuilder { - error_stream: format!("diag.{name}"), - exit_stream: format!("diag.{name}"), - bgproc: None, - }, - tx, - )?; - - Ok(rx) -} - -pub fn run(cmd: Command) -> Result> { +pub fn run(cmd: Command, ab: ActivityBuilder) -> Result> { let (tx, rx) = channel::(100); - - run_common( - cmd, - ActivityBuilder { - error_stream: "worker".to_string(), - exit_stream: "task".to_string(), - bgproc: None, - }, - tx, - )?; - + run_inner(cmd, ab, tx)?; Ok(rx) } -fn run_common( +fn run_inner( mut cmd: Command, ab: ActivityBuilder, tx: Sender, @@ -289,7 +292,7 @@ fn run_common( ) .unwrap(); - if ab.bgproc.is_some() { + if ab.is_bg() { /* * No further notifications are required for background * processes. @@ -313,7 +316,7 @@ fn run_common( let stdio_warning = !thread_done(&mut readout, "stdout", until) | !thread_done(&mut readerr, "stderr", until); - if ab.bgproc.is_some() { + if ab.is_bg() { /* * No further notifications are required for background * processes. @@ -333,7 +336,7 @@ fn run_common( } }; - assert!(ab.bgproc.is_none()); + assert!(!ab.is_bg()); if stdio_warning { tx.blocking_send(ab.warn( @@ -449,13 +452,9 @@ impl BackgroundProcesses { c.uid(uid); c.gid(gid); - let pid = run_common( + let pid = run_inner( c, - ActivityBuilder { - error_stream: format!("bg.{name}"), - exit_stream: format!("bg.{name}"), - bgproc: Some(name.to_string()), - }, + ActivityBuilder::Bg(name.to_string()), self.tx.clone(), ) .map_err(|e| anyhow!("starting background process {name:?}: {e}"))?; @@ -522,8 +521,7 @@ impl BackgroundProcesses { self.rx.close(); while let Some(a) = self.rx.recv().await { if let Activity::Output(o) = &a { - if o.stream.ends_with("stdout") || o.stream.ends_with("stderr") - { + if o.stream.is_output() { lastwords.push(a); } } diff --git a/agent/src/main.rs b/agent/src/main.rs index 3d677fc7..d4e617f5 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -36,6 +36,7 @@ mod shadow; mod upload; use control::protocol::{FactoryInfo, PayloadReq, PayloadRes}; +use exec::ActivityBuilder; struct Agent { log: Logger, @@ -92,18 +93,14 @@ impl ConfigFile { } struct OutputRecord { - stream: String, + stream: JobStream, time: DateTime, msg: String, } impl OutputRecord { - fn new(stream: &str, msg: &str) -> OutputRecord { - OutputRecord { - stream: stream.to_string(), - time: Utc::now(), - msg: msg.to_string(), - } + fn new(stream: JobStream, msg: &str) -> OutputRecord { + OutputRecord { stream, time: Utc::now(), msg: msg.to_string() } } } @@ -154,13 +151,13 @@ async fn append_job_worker( events.push(match ae { AppendJobEntry::JobEvent(rec) => WorkerAppendJobOrTask { task: None, - stream: rec.stream, + stream: rec.stream.to_string(), payload: rec.msg, time: rec.time, }, AppendJobEntry::TaskEvent(task, rec) => WorkerAppendJobOrTask { task: Some(task), - stream: rec.stream, + stream: rec.stream.to_string(), payload: rec.msg, time: rec.time, }, @@ -242,7 +239,7 @@ async fn append_worker_worker( events.push(match ae { AppendWorkerEntry::WorkerEvent(rec) => WorkerAppend { - stream: rec.stream, + stream: rec.stream.to_string(), payload: rec.msg, time: rec.time, }, @@ -305,8 +302,11 @@ impl ClientWrap { } async fn append_worker_msg(&self, name: &str, msg: &str) { - self.append_worker(OutputRecord::new(&format!("diag.{name}"), msg)) - .await + self.append_worker(OutputRecord::new( + JobStream::Diag { name: name.into() }, + msg, + )) + .await } async fn append_worker(&self, rec: OutputRecord) { @@ -331,7 +331,7 @@ impl ClientWrap { } async fn append_msg(&self, msg: &str) { - self.append(OutputRecord::new("worker", msg)).await; + self.append(OutputRecord::new(JobStream::Worker, msg)).await; } async fn append_task(&self, task: &WorkerPingTask, rec: OutputRecord) { @@ -344,7 +344,7 @@ impl ClientWrap { } async fn append_task_msg(&self, task: &WorkerPingTask, msg: &str) { - self.append_task(task, OutputRecord::new("task", msg)).await; + self.append_task(task, OutputRecord::new(JobStream::Task, msg)).await; } async fn flush_job_barrier(&self) { @@ -656,7 +656,7 @@ impl ClientWrap { cmd.uid(0); cmd.gid(0); - match exec::run_diagnostic(cmd, name) { + match exec::run(cmd, ActivityBuilder::Diag(name.into())) { Ok(c) => Ok(c), Err(e) => { /* @@ -666,7 +666,7 @@ impl ClientWrap { self.client .worker_append() .body(vec![WorkerAppend { - stream: "agent".into(), + stream: JobStream::Agent.to_string(), time: Utc::now(), payload: format!("ERROR: diag.{name} exec: {e:?}"), }]) @@ -1633,7 +1633,7 @@ async fn cmd_run(mut l: Level) -> Result<()> { cmd.uid(t.uid); cmd.gid(t.gid); - match exec::run(cmd) { + match exec::run(cmd, ActivityBuilder::Task) { Ok(c) => { stage = Stage::Child(c, t, None); } @@ -1649,7 +1649,7 @@ async fn cmd_run(mut l: Level) -> Result<()> { .job(cw.job_id().unwrap()) .body(vec![WorkerAppendJobOrTask { task: None, - stream: "agent".into(), + stream: JobStream::Agent.to_string(), time: Utc::now(), payload: format!("ERROR: exec: {:?}", e), }]) diff --git a/bin/src/main.rs b/bin/src/main.rs index 96f0fd49..be716a1e 100644 --- a/bin/src/main.rs +++ b/bin/src/main.rs @@ -644,33 +644,23 @@ async fn poll_job(l: &Level, id: &str, json: bool) -> Result<()> { loop { match wat.recv().await { Some(Ok(buildomat_client::EventOrState::Event(e))) => { - if json { - println!("{}", serde_json::to_string(&e)?); - } else if e.stream == "stdout" || e.stream == "stderr" { - println!("{}", e.payload); - } else if e.stream == "control" { - println!("|=| {}", e.payload); - } else if e.stream == "worker" { - println!("|W| {}", e.payload); - } else if e.stream == "task" { - println!("|T| {}", e.payload); - } else if e.stream == "console" { - println!("|C| {}", e.payload); - } else if e.stream == "panic" { - println!("|!| {}", e.payload); - } else if e.stream.starts_with("bg.") { - let t = e.stream.split('.').collect::>(); - if t.len() == 3 { - if t[2] == "stdout" || t[2] == "stderr" { - println!("[{}] {}", t[1], e.payload); - } else { - println!("{:?}", e); - } - } else { - println!("{:?}", e); - } - } else { - println!("{:?}", e); + let stream = JobStream::from_str(&e.stream); + let p = &e.payload; + match stream { + _ if json => println!("{}", serde_json::to_string(&e)?), + JobStream::BgStderr { name } => println!("[{name}] {p}"), + JobStream::BgStdout { name } => println!("[{name}] {p}"), + JobStream::Console => println!("|C| {p}"), + JobStream::Control => println!("|=| {p}"), + JobStream::Panic => println!("|!| {p}"), + JobStream::Task => println!("|T| {p}"), + JobStream::Worker => println!("|W| {p}"), + JobStream::Stderr | JobStream::Stdout => println!("{p}"), + JobStream::Agent + | JobStream::Bg { .. } + | JobStream::Diag { .. } + | JobStream::Error + | JobStream::Unknown(_) => println!("{e:?}"), } } Some(Ok(buildomat_client::EventOrState::State(st))) => { diff --git a/common/src/job_streams.rs b/common/src/job_streams.rs new file mode 100644 index 00000000..b9640e68 --- /dev/null +++ b/common/src/job_streams.rs @@ -0,0 +1,87 @@ +/* + * Copyright 2026 Oxide Computer Company + */ + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum JobStream { + Agent, + Bg { name: String }, + BgStderr { name: String }, + BgStdout { name: String }, + Console, + Control, + Diag { name: String }, + Error, + Panic, + Stderr, + Stdout, + Task, + Worker, + Unknown(String), +} + +impl JobStream { + pub fn from_str(stream: &str) -> JobStream { + let parts = stream.split('.').collect::>(); + match parts.as_slice() { + ["agent"] => JobStream::Agent, + ["bg", name] => JobStream::Bg { name: name.to_string() }, + ["bg", name, "stdout"] => { + JobStream::BgStdout { name: name.to_string() } + } + ["bg", name, "stderr"] => { + JobStream::BgStderr { name: name.to_string() } + } + ["console"] => JobStream::Console, + ["control"] => JobStream::Control, + ["diag", name] => JobStream::Diag { name: name.to_string() }, + ["error"] => JobStream::Error, + ["panic"] => JobStream::Panic, + ["stderr"] => JobStream::Stderr, + ["stdout"] => JobStream::Stdout, + ["task"] => JobStream::Task, + ["worker"] => JobStream::Worker, + _ => JobStream::Unknown(stream.to_string()), + } + } + + pub fn is_output(&self) -> bool { + match self { + JobStream::BgStderr { .. } + | JobStream::BgStdout { .. } + | JobStream::Stderr + | JobStream::Stdout => true, + JobStream::Agent + | JobStream::Bg { .. } + | JobStream::Console + | JobStream::Control + | JobStream::Diag { .. } + | JobStream::Error + | JobStream::Panic + | JobStream::Task + | JobStream::Worker + | JobStream::Unknown(_) => false, + } + } +} + +impl std::fmt::Display for JobStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + JobStream::Agent => write!(f, "agent"), + JobStream::Bg { name } => write!(f, "bg.{name}"), + JobStream::BgStderr { name } => write!(f, "bg.{name}.stderr"), + JobStream::BgStdout { name } => write!(f, "bg.{name}.stdout"), + JobStream::Console => f.write_str("console"), + JobStream::Control => f.write_str("control"), + JobStream::Diag { name } => write!(f, "diag.{name}"), + JobStream::Error => f.write_str("error"), + JobStream::Panic => f.write_str("panic"), + JobStream::Stderr => f.write_str("stderr"), + JobStream::Stdout => f.write_str("stdout"), + JobStream::Task => f.write_str("task"), + JobStream::Worker => f.write_str("worker"), + JobStream::Unknown(s) => f.write_str(s), + } + } +} diff --git a/common/src/lib.rs b/common/src/lib.rs index 88ea9656..dda5f806 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -2,6 +2,8 @@ * Copyright 2025 Oxide Computer Company */ +mod job_streams; + use std::io::{IsTerminal, Read}; use std::path::Path; use std::sync::{Mutex, OnceLock}; @@ -16,6 +18,8 @@ use rusty_ulid::Ulid; use serde::{Deserialize, Serialize}; use slog::{o, Drain, Logger}; +pub use job_streams::JobStream; + pub fn read_toml, T>(n: P) -> Result where for<'de> T: Deserialize<'de>, diff --git a/github/server/src/variety/basic.rs b/github/server/src/variety/basic.rs index 1a24a35c..2db4731b 100644 --- a/github/server/src/variety/basic.rs +++ b/github/server/src/variety/basic.rs @@ -456,74 +456,43 @@ pub(crate) async fn run( p.event_minseq = ev.seq + 1; } - let stdio = ev.stream == "stdout" || ev.stream == "stderr"; - let console = ev.stream == "console"; - let panic = ev.stream == "panic"; - let worker = ev.stream == "worker"; - let bgproc = ev.stream.starts_with("bg."); - - if stdio || console || panic { - /* - * Some commands, like "cargo build --verbose", generate - * exceptionally long output lines, running into the - * thousands of characters. The long lines present two - * challenges: they are not readily visible without - * horizontal scrolling in the GitHub UI; the maximum status - * message length GitHub will accept is 64KB, and even a - * small number of long lines means our status update will - * not be accepted. - * - * If a line is longer than 90 characters, truncate it. - * Users will still be able to see the full output in our - * detailed view where we get to render the whole page. - */ - let mut line = if console { - "|C| " - } else if panic { - "|!|" - } else { - "| " + match JobStream::from_str(&ev.stream) { + JobStream::Stderr | JobStream::Stdout => { + let l = format!("| {}", truncate_line(&ev.payload)); + p.events_tail.push_back((None, l)); } - .to_string(); - - /* - * We support ANSI escapes in the log renderer, which means - * that tools will generate ANSI sequences. That doesn't - * work in the GitHub renderer, so we need to strip them out - * entirely. - */ - let payload = strip_ansi_escapes::strip_str(&ev.payload); - let mut chars = payload.chars(); - - for _ in 0..MAX_LINE_LENGTH { - if let Some(c) = chars.next() { - line.push(c); - } else { - break; - } + JobStream::Console => { + let l = format!("|C| {}", truncate_line(&ev.payload)); + p.events_tail.push_back((None, l)); } - if chars.next().is_some() { + JobStream::Panic => { + let l = format!("|!| {}", truncate_line(&ev.payload)); + p.events_tail.push_back((None, l)); + } + JobStream::Worker => { /* - * If any characters remain, the string was truncated. + * A job may produce a large number of files. We must + * not treat worker output (which is mostly about file + * uploads) as headers. They must be regular records + * that are discarded as they scroll off the top. */ - line.push_str(" [...]"); + let line = format!("|W| {}", ev.payload); + p.events_tail.push_back((None, line)); } - - p.events_tail.push_back((None, line)); - } else if worker { - /* - * A job may produce a large number of files. We must not - * treat worker output (which is mostly about file uploads - * and so on) as headers. They must be regular records that - * are discarded as they scroll off the top. - */ - let line = format!("|W| {}", ev.payload); - p.events_tail.push_back((None, line)); - } else if !bgproc { - p.events_tail.push_back(( - Some(format!("{}/{:?}", ev.stream, ev.task)), - format!("{}: {}", ev.stream, ev.payload), - )); + JobStream::Agent + | JobStream::Control + | JobStream::Diag { .. } + | JobStream::Error + | JobStream::Task + | JobStream::Unknown(_) => { + p.events_tail.push_back(( + Some(format!("{}/{:?}", ev.stream, ev.task)), + format!("{}: {}", ev.stream, ev.payload), + )); + } + JobStream::Bg { .. } + | JobStream::BgStderr { .. } + | JobStream::BgStdout { .. } => {} } } @@ -837,6 +806,44 @@ pub(crate) async fn run( Ok(true) } +/* + * Some commands, like "cargo build --verbose", generate exceptionally long + * output lines, running into the thousands of characters. The long lines + * present two challenges: they are not readily visible without horizontal + * scrolling in the GitHub UI; the maximum status message length GitHub will + * accept is 64KB, and even a small number of long lines means our status + * update will not be accepted. + * + * If a line is longer than 90 characters, truncate it. Users will still be + * able to see the full output in our detailed view where we get to render the + * whole page. + */ +fn truncate_line(payload: &str) -> String { + /* + * We support ANSI escapes in the log renderer, which means that tools will + * generate ANSI sequences. That doesn't work in the GitHub renderer, so + * we need to strip them out entirely. + */ + let payload = strip_ansi_escapes::strip_str(payload); + let mut chars = payload.chars(); + + let mut line = String::new(); + for _ in 0..MAX_LINE_LENGTH { + if let Some(c) = chars.next() { + line.push(c); + } else { + break; + } + } + if chars.next().is_some() { + /* + * If any characters remain, the string was truncated. + */ + line.push_str(" [...]"); + } + line +} + async fn bunyan_to_html( f: &mut tokio::fs::File, dec: &mut buildomat_bunyan::BunyanDecoder, diff --git a/jobsh/src/lib.rs b/jobsh/src/lib.rs index 3989764a..204ae7a8 100644 --- a/jobsh/src/lib.rs +++ b/jobsh/src/lib.rs @@ -5,25 +5,19 @@ use std::borrow::Cow; use buildomat_client::types::JobEvent; +use buildomat_common::JobStream; use chrono::SecondsFormat; use serde::Serialize; pub mod jobfile; pub mod variety; -/* - * Classes for these streams are defined in the "variety/basic/www/style.css", - * which we send along with the generated HTML output. - */ -const CSS_STREAM_CLASSES: &[&str] = - &["stdout", "stderr", "task", "worker", "control", "console", "panic"]; - pub trait JobEventEx { /** * Choose a colour (CSS class name) for the stream to which this event * belongs. */ - fn css_class(&self) -> String; + fn css_class(&self) -> &'static str; /** * Turn a job event into a somewhat abstract object with pre-formatted HTML @@ -34,15 +28,22 @@ pub trait JobEventEx { } impl JobEventEx for JobEvent { - fn css_class(&self) -> String { - let s = self.stream.as_str(); - - if CSS_STREAM_CLASSES.contains(&s) { - format!("s_{s}") - } else if s.starts_with("bg.") { - "s_bgtask".into() - } else { - "s_default".into() + fn css_class(&self) -> &'static str { + match JobStream::from_str(&self.stream) { + JobStream::Console => "s_console", + JobStream::Control => "s_control", + JobStream::Panic => "s_panic", + JobStream::Stderr => "s_stderr", + JobStream::Stdout => "s_stdout", + JobStream::Task => "s_task", + JobStream::Worker => "s_worker", + JobStream::Bg { .. } + | JobStream::BgStderr { .. } + | JobStream::BgStdout { .. } => "s_bgtask", + JobStream::Agent + | JobStream::Error + | JobStream::Diag { .. } + | JobStream::Unknown(_) => "s_default", } } @@ -137,7 +138,7 @@ fn encode_payload(payload: &str) -> Cow<'_, str> { #[derive(Debug, Serialize)] pub struct EventRow { task: Option, - css_class: String, + css_class: &'static str, fields: Vec, } From db2d93fa6bf5889400aecfeff554c6f55c8fcffe Mon Sep 17 00:00:00 2001 From: Emily Albini Date: Mon, 4 May 2026 12:26:59 +0200 Subject: [PATCH 3/7] editorconfig: use tabs for js files --- .editorconfig | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.editorconfig b/.editorconfig index b7215c9b..8c42fff1 100644 --- a/.editorconfig +++ b/.editorconfig @@ -2,3 +2,6 @@ root = true [*] max_line_length = 80 + +[*.js] +indent_style = tab From 21fc6819a045e59c1f3e2af3451362d1c09af354 Mon Sep 17 00:00:00 2001 From: Emily Albini Date: Mon, 13 Apr 2026 17:23:02 +0200 Subject: [PATCH 4/7] agent: refactor starting background processes --- agent/src/control/mod.rs | 46 ++++++++++++++++++----------------- agent/src/control/protocol.rs | 21 +++++++++------- agent/src/exec.rs | 32 ++++++++---------------- agent/src/main.rs | 12 ++------- 4 files changed, 48 insertions(+), 63 deletions(-) diff --git a/agent/src/control/mod.rs b/agent/src/control/mod.rs index 821fdbe3..6e445ac7 100644 --- a/agent/src/control/mod.rs +++ b/agent/src/control/mod.rs @@ -14,7 +14,7 @@ use tokio::{ }; use protocol::{ - Decoder, FactoryInfo, Message, Payload, PayloadReq, PayloadRes, + Decoder, FactoryInfo, Message, Payload, PayloadReq, PayloadRes, Process, }; pub(crate) mod protocol; @@ -387,14 +387,29 @@ async fn cmd_process_start(mut l: Level) -> Result<()> { let a = args!(l); - if a.args().len() < 2 { + let req = PayloadReq::ProcessStart(build_process(&l, a.args())?); + match l.context_mut().req(req).await? { + PayloadRes::Error(e) => { + /* + * This request is purely local to the agent, so an + * error is not something we should retry indefinitely. + */ + bail!("could not start process: {e}"); + } + PayloadRes::Ack => Ok(()), + other => bail!("unexpected response: {other:?}"), + } +} + +fn build_process(l: &Level, args: &[String]) -> Result { + if args.len() < 2 { bad_args!(l, "specify at least a process name and a command to run"); } - let payload = PayloadReq::ProcessStart { - name: a.args()[0].to_string(), - cmd: a.args()[1].to_string(), - args: a.args().iter().skip(2).cloned().collect::>(), + Ok(Process { + name: args[0].clone(), + cmd: args[1].clone(), + args: args[2..].to_vec(), /* * The process will actually be spawned by the agent, which is @@ -403,24 +418,11 @@ async fn cmd_process_start(mut l: Level) -> Result<()> { * process can be started as if it were run from the job program * itself. */ - env: std::env::vars_os().collect::>(), - pwd: std::env::current_dir()?.to_str().unwrap().to_string(), - + env: std::env::vars_os().collect(), + pwd: std::env::current_dir()?.into_os_string(), uid: unsafe { libc::geteuid() }, gid: unsafe { libc::getegid() }, - }; - - match l.context_mut().req(payload).await? { - PayloadRes::Error(e) => { - /* - * This request is purely local to the agent, so an - * error is not something we should retry indefinitely. - */ - bail!("could not start process: {e}"); - } - PayloadRes::Ack => Ok(()), - other => bail!("unexpected response: {other:?}"), - } + }) } async fn factory_info(s: &mut Stuff) -> Result { diff --git a/agent/src/control/protocol.rs b/agent/src/control/protocol.rs index 2b0a3fcd..8a9ed4b1 100644 --- a/agent/src/control/protocol.rs +++ b/agent/src/control/protocol.rs @@ -23,20 +23,23 @@ pub struct StoreEntry { pub secret: bool, } +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct Process { + pub name: String, + pub cmd: String, + pub args: Vec, + pub env: Vec<(OsString, OsString)>, + pub pwd: OsString, + pub uid: u32, + pub gid: u32, +} + #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] pub enum PayloadReq { StoreGet(String), StorePut(String, String, bool), MetadataAddresses, - ProcessStart { - name: String, - cmd: String, - args: Vec, - env: Vec<(OsString, OsString)>, - pwd: String, - uid: u32, - gid: u32, - }, + ProcessStart(Process), FactoryInfo, } diff --git a/agent/src/exec.rs b/agent/src/exec.rs index 282426cd..4a9233d9 100644 --- a/agent/src/exec.rs +++ b/agent/src/exec.rs @@ -3,7 +3,6 @@ */ use std::collections::HashMap; -use std::ffi::OsString; use std::io::{BufRead, BufReader, Read}; use std::os::unix::process::{CommandExt, ExitStatusExt}; use std::process::{Command, Stdio}; @@ -15,6 +14,7 @@ use buildomat_common::JobStream; use chrono::prelude::*; use super::OutputRecord; +use crate::control::protocol::Process; fn spawn_reader( tx: Sender, @@ -386,23 +386,11 @@ impl BackgroundProcesses { BackgroundProcesses { rx, tx, procs: Default::default() } } - pub fn start<'a, A, E>( - &mut self, - name: &str, - cmd: &str, - args: A, - env: E, - pwd: &str, - uid: u32, - gid: u32, - ) -> Result - where - A: IntoIterator, - E: IntoIterator, - { + pub fn start(&mut self, process: &Process) -> Result { /* * Process name must be unique within the task. */ + let name = &process.name; if self.procs.contains_key(name) { bail!("background process {name:?} is already running"); } @@ -423,7 +411,7 @@ impl BackgroundProcesses { * child terminates, tearing down the rest of the children: */ c.arg("-l").arg("child"); - c.arg(cmd); + c.arg(&process.cmd); c }; @@ -433,10 +421,10 @@ impl BackgroundProcesses { * Regrettably other operating systems do not have contracts. For * now, just start the program. */ - Command::new(cmd) + Command::new(&process.cmd) }; - for a in args { + for a in &process.args { c.arg(a); } @@ -444,13 +432,13 @@ impl BackgroundProcesses { * Use the environment, working directory, and credentials passed to us * by the control program, not our own: */ - c.current_dir(pwd); + c.current_dir(&process.pwd); c.env_clear(); - for (k, v) in env { + for (k, v) in &process.env { c.env(k, v); } - c.uid(uid); - c.gid(gid); + c.uid(process.uid); + c.gid(process.gid); let pid = run_inner( c, diff --git a/agent/src/main.rs b/agent/src/main.rs index d4e617f5..e0ec19f1 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -1438,16 +1438,8 @@ async fn cmd_run(mut l: Level) -> Result<()> { .map(|md| md.addresses().to_vec()) .unwrap_or_default(), ), - PayloadReq::ProcessStart { - name, - cmd, - args, - env, - pwd, - uid, - gid, - } => { - match bgprocs.start(name, cmd, args, env, pwd, *uid, *gid) { + PayloadReq::ProcessStart(process) => { + match bgprocs.start(process) { Ok(_) => PayloadRes::Ack, Err(e) => PayloadRes::Error(e.to_string()), } From 66218c9e1fffc599bd7a9578a031ba1ca0f602a3 Mon Sep 17 00:00:00 2001 From: Emily Albini Date: Thu, 16 Apr 2026 11:54:18 +0200 Subject: [PATCH 5/7] agent: allow Stage::Child to run more than just tasks --- agent/src/exec.rs | 12 ++++++------ agent/src/main.rs | 42 ++++++++++++++++++++++++++++-------------- 2 files changed, 34 insertions(+), 20 deletions(-) diff --git a/agent/src/exec.rs b/agent/src/exec.rs index 4a9233d9..28f635dd 100644 --- a/agent/src/exec.rs +++ b/agent/src/exec.rs @@ -129,7 +129,7 @@ pub enum Activity { #[derive(Clone)] pub enum ActivityBuilder { - Task, + Task(u32), Diag(String), Bg(String), } @@ -137,7 +137,7 @@ pub enum ActivityBuilder { impl ActivityBuilder { fn stdout_stream(&self) -> JobStream { match self.clone() { - ActivityBuilder::Task => JobStream::Stdout, + ActivityBuilder::Task(_) => JobStream::Stdout, ActivityBuilder::Diag(_) => JobStream::Stdout, ActivityBuilder::Bg(name) => JobStream::BgStdout { name }, } @@ -145,7 +145,7 @@ impl ActivityBuilder { fn stderr_stream(&self) -> JobStream { match self.clone() { - ActivityBuilder::Task => JobStream::Stderr, + ActivityBuilder::Task(_) => JobStream::Stderr, ActivityBuilder::Diag(_) => JobStream::Stderr, ActivityBuilder::Bg(name) => JobStream::BgStderr { name }, } @@ -153,7 +153,7 @@ impl ActivityBuilder { fn exit_stream(&self) -> JobStream { match self.clone() { - ActivityBuilder::Task => JobStream::Task, + ActivityBuilder::Task(_) => JobStream::Task, ActivityBuilder::Diag(name) => JobStream::Diag { name }, ActivityBuilder::Bg(name) => JobStream::Bg { name }, } @@ -161,7 +161,7 @@ impl ActivityBuilder { fn error_stream(&self) -> JobStream { match self.clone() { - ActivityBuilder::Task => JobStream::Worker, + ActivityBuilder::Task(_) => JobStream::Worker, ActivityBuilder::Diag(name) => JobStream::Diag { name }, ActivityBuilder::Bg(name) => JobStream::Bg { name }, } @@ -174,7 +174,7 @@ impl ActivityBuilder { fn errmsg(&self, pfx: &str, msg: &str) -> String { let mut s = format!("{pfx}: "); match self { - ActivityBuilder::Task => {} + ActivityBuilder::Task(_) => {} ActivityBuilder::Diag(_) => {} ActivityBuilder::Bg(name) => { s += &format!("background process {name:?}: ") diff --git a/agent/src/main.rs b/agent/src/main.rs index e0ec19f1..627ab455 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -334,17 +334,28 @@ impl ClientWrap { self.append(OutputRecord::new(JobStream::Worker, msg)).await; } - async fn append_task(&self, task: &WorkerPingTask, rec: OutputRecord) { + async fn append_activity(&self, ab: &ActivityBuilder, rec: OutputRecord) { self.tx .as_ref() .unwrap() - .send(AppendJobEntry::TaskEvent(task.id, rec)) + .send(match ab { + ActivityBuilder::Task(id) => { + AppendJobEntry::TaskEvent(*id, rec) + } + ActivityBuilder::Diag(_) | ActivityBuilder::Bg(_) => { + AppendJobEntry::JobEvent(rec) + } + }) .await .unwrap(); } async fn append_task_msg(&self, task: &WorkerPingTask, msg: &str) { - self.append_task(task, OutputRecord::new(JobStream::Task, msg)).await; + self.append_activity( + &ActivityBuilder::Task(task.id), + OutputRecord::new(JobStream::Task, msg), + ) + .await; } async fn flush_job_barrier(&self) { @@ -360,7 +371,7 @@ impl ClientWrap { rx.await.unwrap(); } - async fn task_complete(&self, task: &WorkerPingTask, failed: bool) { + async fn task_complete(&self, task_id: u32, failed: bool) { /* * Make sure any previously enqueued event log events have gone out to * the server before we complete the task. @@ -372,7 +383,7 @@ impl ClientWrap { .client .worker_task_complete() .job(self.job_id().unwrap()) - .task(task.id) + .task(task_id) .body_map(|body| body.failed(failed)) .send() .await @@ -1009,7 +1020,7 @@ enum Stage { Ready, Download(mpsc::Receiver), NextTask, - Child(mpsc::Receiver, WorkerPingTask, Option), + Child(mpsc::Receiver, ActivityBuilder, Option), Upload(mpsc::Receiver), StartPreDiagnostics(String), PreDiagnostics(mpsc::Receiver, Option), @@ -1625,9 +1636,10 @@ async fn cmd_run(mut l: Level) -> Result<()> { cmd.uid(t.uid); cmd.gid(t.gid); - match exec::run(cmd, ActivityBuilder::Task) { + let ab = ActivityBuilder::Task(t.id); + match exec::run(cmd, ab.clone()) { Ok(c) => { - stage = Stage::Child(c, t, None); + stage = Stage::Child(c, ab, None); } Err(e) => { job_failed = true; @@ -1651,7 +1663,7 @@ async fn cmd_run(mut l: Level) -> Result<()> { } } } - Stage::Child(ch, t, failed) => { + Stage::Child(ch, ab, failed) => { let a = tokio::select! { _ = pingfreq.tick() => { do_ping = true; @@ -1667,10 +1679,10 @@ async fn cmd_run(mut l: Level) -> Result<()> { match a { Some(exec::Activity::Output(o)) => { - cw.append_task(t, o.to_record()).await; + cw.append_activity(ab, o.to_record()).await; } Some(exec::Activity::Exit(ex)) => { - cw.append_task(t, ex.to_record()).await; + cw.append_activity(ab, ex.to_record()).await; /* * Preserve the exit status for when we record task @@ -1689,21 +1701,23 @@ async fn cmd_run(mut l: Level) -> Result<()> { */ for a in bgprocs.killall().await { if let exec::Activity::Output(o) = a { - cw.append_task(t, o.to_record()).await; + cw.append_activity(ab, o.to_record()).await; } } /* * Record completion of this task within the job. */ - cw.task_complete(t, failed.unwrap()).await; + if let ActivityBuilder::Task(id) = ab { + cw.task_complete(*id, failed.unwrap()).await; + } stage = Stage::NextTask; } None => { for a in bgprocs.killall().await { if let exec::Activity::Output(o) = a { - cw.append_task(t, o.to_record()).await; + cw.append_activity(ab, o.to_record()).await; } } From 5c6c64f92e9ba5c21d921b0be0d9177818e003bd Mon Sep 17 00:00:00 2001 From: Emily Albini Date: Fri, 17 Apr 2026 11:21:48 +0200 Subject: [PATCH 6/7] jobsh: make section splitting generic --- jobsh/src/lib.rs | 27 ++++++++++++++++++++++++--- jobsh/src/variety/basic.rs | 10 +++++----- variety/basic/www/live.js | 10 ++++++---- 3 files changed, 35 insertions(+), 12 deletions(-) diff --git a/jobsh/src/lib.rs b/jobsh/src/lib.rs index 204ae7a8..642ce20a 100644 --- a/jobsh/src/lib.rs +++ b/jobsh/src/lib.rs @@ -7,7 +7,7 @@ use std::borrow::Cow; use buildomat_client::types::JobEvent; use buildomat_common::JobStream; use chrono::SecondsFormat; -use serde::Serialize; +use serde::{Serialize, Serializer}; pub mod jobfile; pub mod variety; @@ -68,9 +68,15 @@ impl JobEventEx for JobEvent { encode_payload(&self.payload), ); + let section = if let Some(id) = self.task { + EventSection::Task(id) + } else { + EventSection::None + }; + EventRow { - task: self.task, css_class: self.css_class(), + section, fields: vec![ EventField { css_class: "num", @@ -137,8 +143,8 @@ fn encode_payload(payload: &str) -> Cow<'_, str> { #[derive(Debug, Serialize)] pub struct EventRow { - task: Option, css_class: &'static str, + section: EventSection, fields: Vec, } @@ -154,6 +160,21 @@ pub struct EventField { anchor: Option, } +#[derive(Debug, PartialEq, Eq)] +enum EventSection { + None, + Task(u32), +} + +impl Serialize for EventSection { + fn serialize(&self, s: S) -> Result { + match self { + EventSection::None => s.serialize_str("none"), + EventSection::Task(idx) => s.serialize_str(&format!("task:{idx}")), + } + } +} + #[cfg(test)] pub mod test { use super::*; diff --git a/jobsh/src/variety/basic.rs b/jobsh/src/variety/basic.rs index a5c32e67..cf5d05ec 100644 --- a/jobsh/src/variety/basic.rs +++ b/jobsh/src/variety/basic.rs @@ -13,7 +13,7 @@ use futures::future::BoxFuture; use hyper::Response; use serde::{Deserialize, Serialize}; -use crate::JobEventEx; +use crate::{EventSection, JobEventEx}; /* * We can use "deny_unknown_fields" here because the global frontmatter fields @@ -56,7 +56,7 @@ pub async fn output_table( ) -> Result { let mut out = "\n".to_string(); - let mut last = None; + let mut last = EventSection::None; out += "\n"; out += "\n"; @@ -112,10 +112,10 @@ pub async fn output_table( let evr = ev.event_row(); /* - * If the task has changed, render a full-width blank row in the + * If the section has changed, render a full-width blank row in the * table: */ - if evr.task != last { + if evr.section != last { let cols = evr .fields .iter() @@ -124,7 +124,7 @@ pub async fn output_table( out += &format!(""); } - last = evr.task; + last = evr.section; out += &format!("\n", evr.css_class); diff --git a/variety/basic/www/live.js b/variety/basic/www/live.js index d717eba8..8375433a 100644 --- a/variety/basic/www/live.js +++ b/variety/basic/www/live.js @@ -4,7 +4,7 @@ var EVT; var LOCAL_TIME = %LOCAL_TIME%; -var LAST_TASK = null; +var LAST_SECTION = null; var LAST_EVENT = null; /* @@ -68,9 +68,11 @@ basic_row(ev) } /* - * If the task has changed, render a full-width blank row in the table: + * If the section has changed, render a full-width blank row in the table: */ - if (evr.task !== LAST_TASK) { + if (LAST_SECTION === null) { + LAST_SECTION = evr.section; + } else if (evr.section !== LAST_SECTION) { let count = 0; for (let i = 0; i < evr.fields.length; i++) { let f = evr.fields[i]; @@ -87,7 +89,7 @@ basic_row(ev) tr.appendChild(td); tbl.appendChild(td); - LAST_TASK = evr.task; + LAST_SECTION = evr.section; } let tr = document.createElement("tr"); From b81447b7e939deca3ee7f71bf19c8a6d2b99059e Mon Sep 17 00:00:00 2001 From: Emily Albini Date: Mon, 13 Apr 2026 18:15:22 +0200 Subject: [PATCH 7/7] agent: implement post tasks --- agent/src/control/mod.rs | 57 +++++++++ agent/src/control/protocol.rs | 20 ++++ agent/src/exec.rs | 6 + agent/src/main.rs | 178 ++++++++++++++++++++++++++--- bin/src/main.rs | 8 +- common/src/job_streams.rs | 16 +++ github/server/src/variety/basic.rs | 5 +- jobsh/src/lib.rs | 12 ++ variety/basic/www/style.css | 12 ++ 9 files changed, 296 insertions(+), 18 deletions(-) diff --git a/agent/src/control/mod.rs b/agent/src/control/mod.rs index 6e445ac7..1a95153f 100644 --- a/agent/src/control/mod.rs +++ b/agent/src/control/mod.rs @@ -123,6 +123,7 @@ pub async fn main() -> Result<()> { l.cmd("store", "access the job store", cmd!(cmd_store))?; l.cmd("address", "manage IP addresses for this job", cmd!(cmd_address))?; l.cmd("process", "manage background processes", cmd!(cmd_process))?; + l.cmd("post", "manage post tasks", cmd!(cmd_post))?; l.cmd("factory", "factory information for this worker", cmd!(cmd_factory))?; l.hcmd("eng", "for working on and testing buildomat", cmd!(cmd_eng))?; @@ -374,6 +375,62 @@ async fn cmd_store_put(mut l: Level) -> Result<()> { } } +async fn cmd_post(mut l: Level) -> Result<()> { + l.context_mut().connect().await?; + + l.cmd( + "success", + "add a post task running on success", + cmd!(cmd_post_success), + )?; + l.cmd( + "failure", + "add a post task running on failure", + cmd!(cmd_post_failure), + )?; + l.cmd("always", "add a post task that always runs", cmd!(cmd_post_always))?; + + sel!(l).run().await +} + +async fn cmd_post_success(l: Level) -> Result<()> { + post_inner(l, &[PayloadReq::PostSuccess]).await +} + +async fn cmd_post_failure(l: Level) -> Result<()> { + post_inner(l, &[PayloadReq::PostFailure]).await +} + +async fn cmd_post_always(l: Level) -> Result<()> { + post_inner(l, &[PayloadReq::PostSuccess, PayloadReq::PostFailure]).await +} + +async fn post_inner( + mut l: Level, + payloads: &[fn(Process) -> PayloadReq], +) -> Result<()> { + l.usage_args(Some("NAME COMMAND [ARGS...]")); + + let a = args!(l); + + for payload in payloads { + let req = payload(build_process(&l, a.args())?); + match l.context_mut().req(req).await? { + PayloadRes::Error(e) => { + /* + * This request is purely local to the agent, so an + * error is not something we should retry indefinitely. + */ + bail!("could not enqueue post task {:?}: {e}", a.args()[0]); + } + PayloadRes::Ack => {} + other => bail!("unexpected response: {other:?}"), + } + } + + Ok(()) +} + async fn cmd_process(mut l: Level) -> Result<()> { l.context_mut().connect().await?; diff --git a/agent/src/control/protocol.rs b/agent/src/control/protocol.rs index 8a9ed4b1..a13e901a 100644 --- a/agent/src/control/protocol.rs +++ b/agent/src/control/protocol.rs @@ -34,12 +34,32 @@ pub struct Process { pub gid: u32, } +impl Process { + pub fn validate(&self) -> Result<(), String> { + let name = &self.name; + + if name.len() > 32 { + return Err(format!("process name {name:?} is longer than 32")); + } + if let Some(c) = name + .chars() + .find(|&c| !c.is_ascii_alphanumeric() && c != '-' && c != '_') + { + return Err(format!("invalid char {c:?} in process name {name:?}")); + } + + Ok(()) + } +} + #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] pub enum PayloadReq { StoreGet(String), StorePut(String, String, bool), MetadataAddresses, ProcessStart(Process), + PostSuccess(Process), + PostFailure(Process), FactoryInfo, } diff --git a/agent/src/exec.rs b/agent/src/exec.rs index 28f635dd..ef520bd9 100644 --- a/agent/src/exec.rs +++ b/agent/src/exec.rs @@ -132,6 +132,7 @@ pub enum ActivityBuilder { Task(u32), Diag(String), Bg(String), + Post(String), } impl ActivityBuilder { @@ -140,6 +141,7 @@ impl ActivityBuilder { ActivityBuilder::Task(_) => JobStream::Stdout, ActivityBuilder::Diag(_) => JobStream::Stdout, ActivityBuilder::Bg(name) => JobStream::BgStdout { name }, + ActivityBuilder::Post(name) => JobStream::PostStdout { name }, } } @@ -148,6 +150,7 @@ impl ActivityBuilder { ActivityBuilder::Task(_) => JobStream::Stderr, ActivityBuilder::Diag(_) => JobStream::Stderr, ActivityBuilder::Bg(name) => JobStream::BgStderr { name }, + ActivityBuilder::Post(name) => JobStream::PostStderr { name }, } } @@ -156,6 +159,7 @@ impl ActivityBuilder { ActivityBuilder::Task(_) => JobStream::Task, ActivityBuilder::Diag(name) => JobStream::Diag { name }, ActivityBuilder::Bg(name) => JobStream::Bg { name }, + ActivityBuilder::Post(name) => JobStream::Post { name }, } } @@ -164,6 +168,7 @@ impl ActivityBuilder { ActivityBuilder::Task(_) => JobStream::Worker, ActivityBuilder::Diag(name) => JobStream::Diag { name }, ActivityBuilder::Bg(name) => JobStream::Bg { name }, + ActivityBuilder::Post(name) => JobStream::Post { name }, } } @@ -176,6 +181,7 @@ impl ActivityBuilder { match self { ActivityBuilder::Task(_) => {} ActivityBuilder::Diag(_) => {} + ActivityBuilder::Post(_) => {} ActivityBuilder::Bg(name) => { s += &format!("background process {name:?}: ") } diff --git a/agent/src/main.rs b/agent/src/main.rs index 627ab455..1a88e769 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -60,7 +60,9 @@ mod os_constants { } use os_constants::*; -use crate::control::protocol::StoreEntry; +use crate::control::protocol::{Process, StoreEntry}; + +const QUOTA_POST_TASKS_PER_TYPE: usize = 32; #[derive(Serialize, Deserialize)] struct ConfigFile { @@ -342,9 +344,9 @@ impl ClientWrap { ActivityBuilder::Task(id) => { AppendJobEntry::TaskEvent(*id, rec) } - ActivityBuilder::Diag(_) | ActivityBuilder::Bg(_) => { - AppendJobEntry::JobEvent(rec) - } + ActivityBuilder::Diag(_) + | ActivityBuilder::Bg(_) + | ActivityBuilder::Post(_) => AppendJobEntry::JobEvent(rec), }) .await .unwrap(); @@ -1020,7 +1022,13 @@ enum Stage { Ready, Download(mpsc::Receiver), NextTask, - Child(mpsc::Receiver, ActivityBuilder, Option), + NextPostTask(PostQueue), + Child( + mpsc::Receiver, + ActivityBuilder, + Option, + NextStage, + ), Upload(mpsc::Receiver), StartPreDiagnostics(String), PreDiagnostics(mpsc::Receiver, Option), @@ -1030,6 +1038,22 @@ enum Stage { Broken, } +/* + * Unfortunately we can't add the next stage as a filed of the stage: due to how + * stages are used we wouldn't be able to unbox it if we were to add Box. + */ +#[derive(Clone, Copy)] +enum NextStage { + NextTask, + NextPostTask(PostQueue), +} + +#[derive(Clone, Copy)] +enum PostQueue { + Success, + Failure, +} + async fn cmd_install(mut l: Level) -> Result<()> { l.usage_args(Some("BASEURL BOOTSTRAP_TOKEN")); l.optopt("N", "", "set nodename of machine", "NODENAME"); @@ -1225,6 +1249,8 @@ async fn cmd_run(mut l: Level) -> Result<()> { let mut control = control::server::listen()?; let mut creq: Option = None; let mut bgprocs = exec::BackgroundProcesses::new(); + let mut post_tasks_success = VecDeque::new(); + let mut post_tasks_failure = VecDeque::new(); let mut metadata: Option = None; let mut factory: Option = None; @@ -1450,11 +1476,21 @@ async fn cmd_run(mut l: Level) -> Result<()> { .unwrap_or_default(), ), PayloadReq::ProcessStart(process) => { - match bgprocs.start(process) { - Ok(_) => PayloadRes::Ack, - Err(e) => PayloadRes::Error(e.to_string()), + if let Err(err) = process.validate() { + PayloadRes::Error(err) + } else { + match bgprocs.start(process) { + Ok(_) => PayloadRes::Ack, + Err(e) => PayloadRes::Error(e.to_string()), + } } } + PayloadReq::PostSuccess(process) => { + add_post_task(&mut post_tasks_success, process) + } + PayloadReq::PostFailure(process) => { + add_post_task(&mut post_tasks_failure, process) + } PayloadReq::FactoryInfo => { if let Some(f) = &factory { PayloadRes::FactoryInfo(FactoryInfo { @@ -1561,7 +1597,7 @@ async fn cmd_run(mut l: Level) -> Result<()> { if job_failed || tasks.is_empty() { /* * There are no more tasks to complete, so move on to - * uploading outputs. + * running post tasks. */ info!( log, @@ -1569,10 +1605,11 @@ async fn cmd_run(mut l: Level) -> Result<()> { cw.job_id().unwrap(), ); - stage = Stage::Upload(upload::upload( - cw.clone(), - cw.job.as_ref().unwrap().output_rules.clone(), - )); + stage = Stage::NextPostTask(if job_failed { + PostQueue::Failure + } else { + PostQueue::Success + }); continue; } @@ -1639,7 +1676,7 @@ async fn cmd_run(mut l: Level) -> Result<()> { let ab = ActivityBuilder::Task(t.id); match exec::run(cmd, ab.clone()) { Ok(c) => { - stage = Stage::Child(c, ab, None); + stage = Stage::Child(c, ab, None, NextStage::NextTask); } Err(e) => { job_failed = true; @@ -1663,7 +1700,89 @@ async fn cmd_run(mut l: Level) -> Result<()> { } } } - Stage::Child(ch, ab, failed) => { + Stage::NextPostTask(queue) => { + let post_tasks = match queue { + PostQueue::Success => &mut post_tasks_success, + PostQueue::Failure => &mut post_tasks_failure, + }; + + if post_tasks.is_empty() { + /* + * There are no more tasks to complete, so move on to + * uploading outputs. + */ + info!( + log, + "no more post tasks for job {}", + cw.job_id().unwrap() + ); + + stage = Stage::Upload(upload::upload( + cw.clone(), + cw.job.as_ref().unwrap().output_rules.clone(), + )); + continue; + } + + let t = post_tasks.pop_front().unwrap(); + + /* + * Emit an event that we can use to visually separate tasks + * in the output. + */ + let msg = format!("starting post task: \"{}\"", t.name); + cw.append(OutputRecord::new( + JobStream::Post { name: t.name.clone() }, + &msg, + )) + .await; + + let mut cmd = Command::new(t.cmd); + cmd.current_dir(&t.pwd); + cmd.args(&t.args); + cmd.env_clear(); + for (k, v) in &t.env { + cmd.env(k, v); + } + cmd.uid(t.uid); + cmd.gid(t.gid); + + let ab = ActivityBuilder::Post(t.name.clone()); + match exec::run(cmd, ab.clone()) { + Ok(c) => { + stage = Stage::Child( + c, + ab, + None, + NextStage::NextPostTask(*queue), + ); + } + Err(e) => { + job_failed = true; + + /* + * Try to post the error we would have reported + * to the server, but don't try too hard. + */ + cw.client + .worker_job_append() + .job(cw.job_id().unwrap()) + .body(vec![WorkerAppendJobOrTask { + task: None, + stream: JobStream::Post { + name: t.name.clone(), + } + .to_string(), + time: Utc::now(), + payload: format!("ERROR: exec: {:?}", e), + }]) + .send() + .await + .ok(); + } + } + } + Stage::Child(ch, ab, failed, next_stage) => { let a = tokio::select! { _ = pingfreq.tick() => { do_ping = true; @@ -1712,7 +1831,12 @@ async fn cmd_run(mut l: Level) -> Result<()> { cw.task_complete(*id, failed.unwrap()).await; } - stage = Stage::NextTask; + stage = match next_stage { + NextStage::NextTask => Stage::NextTask, + NextStage::NextPostTask(queue) => { + Stage::NextPostTask(*queue) + } + }; } None => { for a in bgprocs.killall().await { @@ -1874,6 +1998,28 @@ async fn cmd_run(mut l: Level) -> Result<()> { } } +fn add_post_task(queue: &mut VecDeque, task: &Process) -> PayloadRes { + let name = &task.name; + + if queue.len() >= QUOTA_POST_TASKS_PER_TYPE { + return PayloadRes::Error(format!( + "cannot register more than {QUOTA_POST_TASKS_PER_TYPE} post tasks" + )); + } + if queue.iter().any(|p| p.name == task.name) { + return PayloadRes::Error(format!( + "a post task named {name:?} already exists", + )); + } + + if let Err(err) = task.validate() { + return PayloadRes::Error(err); + } + + queue.push_back(task.clone()); + PayloadRes::Ack +} + #[tokio::main] async fn main() -> Result<()> { let cmdname = std::env::args().next().as_deref().and_then(|s| { diff --git a/bin/src/main.rs b/bin/src/main.rs index be716a1e..4a3a71f3 100644 --- a/bin/src/main.rs +++ b/bin/src/main.rs @@ -653,9 +653,15 @@ async fn poll_job(l: &Level, id: &str, json: bool) -> Result<()> { JobStream::Console => println!("|C| {p}"), JobStream::Control => println!("|=| {p}"), JobStream::Panic => println!("|!| {p}"), + JobStream::Post { .. } => println!("|P| {p}"), JobStream::Task => println!("|T| {p}"), JobStream::Worker => println!("|W| {p}"), - JobStream::Stderr | JobStream::Stdout => println!("{p}"), + JobStream::Stderr + | JobStream::Stdout + | JobStream::PostStderr { .. } + | JobStream::PostStdout { .. } => { + println!("{p}") + } JobStream::Agent | JobStream::Bg { .. } | JobStream::Diag { .. } diff --git a/common/src/job_streams.rs b/common/src/job_streams.rs index b9640e68..f2bbbf08 100644 --- a/common/src/job_streams.rs +++ b/common/src/job_streams.rs @@ -13,6 +13,9 @@ pub enum JobStream { Diag { name: String }, Error, Panic, + Post { name: String }, + PostStderr { name: String }, + PostStdout { name: String }, Stderr, Stdout, Task, @@ -37,6 +40,13 @@ impl JobStream { ["diag", name] => JobStream::Diag { name: name.to_string() }, ["error"] => JobStream::Error, ["panic"] => JobStream::Panic, + ["post", name] => JobStream::Post { name: name.to_string() }, + ["post", name, "stderr"] => { + JobStream::PostStderr { name: name.to_string() } + } + ["post", name, "stdout"] => { + JobStream::PostStdout { name: name.to_string() } + } ["stderr"] => JobStream::Stderr, ["stdout"] => JobStream::Stdout, ["task"] => JobStream::Task, @@ -49,6 +59,8 @@ impl JobStream { match self { JobStream::BgStderr { .. } | JobStream::BgStdout { .. } + | JobStream::PostStderr { .. } + | JobStream::PostStdout { .. } | JobStream::Stderr | JobStream::Stdout => true, JobStream::Agent @@ -58,6 +70,7 @@ impl JobStream { | JobStream::Diag { .. } | JobStream::Error | JobStream::Panic + | JobStream::Post { .. } | JobStream::Task | JobStream::Worker | JobStream::Unknown(_) => false, @@ -77,6 +90,9 @@ impl std::fmt::Display for JobStream { JobStream::Diag { name } => write!(f, "diag.{name}"), JobStream::Error => f.write_str("error"), JobStream::Panic => f.write_str("panic"), + JobStream::Post { name } => write!(f, "post.{name}"), + JobStream::PostStderr { name } => write!(f, "post.{name}.stderr"), + JobStream::PostStdout { name } => write!(f, "post.{name}.stdout"), JobStream::Stderr => f.write_str("stderr"), JobStream::Stdout => f.write_str("stdout"), JobStream::Task => f.write_str("task"), diff --git a/github/server/src/variety/basic.rs b/github/server/src/variety/basic.rs index 2db4731b..acea5c06 100644 --- a/github/server/src/variety/basic.rs +++ b/github/server/src/variety/basic.rs @@ -492,7 +492,10 @@ pub(crate) async fn run( } JobStream::Bg { .. } | JobStream::BgStderr { .. } - | JobStream::BgStdout { .. } => {} + | JobStream::BgStdout { .. } + | JobStream::Post { .. } + | JobStream::PostStderr { .. } + | JobStream::PostStdout { .. } => {} } } diff --git a/jobsh/src/lib.rs b/jobsh/src/lib.rs index 642ce20a..51a579ea 100644 --- a/jobsh/src/lib.rs +++ b/jobsh/src/lib.rs @@ -33,6 +33,9 @@ impl JobEventEx for JobEvent { JobStream::Console => "s_console", JobStream::Control => "s_control", JobStream::Panic => "s_panic", + JobStream::Post { .. } => "s_post", + JobStream::PostStderr { .. } => "s_post_stderr", + JobStream::PostStdout { .. } => "s_post_stdout", JobStream::Stderr => "s_stderr", JobStream::Stdout => "s_stdout", JobStream::Task => "s_task", @@ -70,6 +73,13 @@ impl JobEventEx for JobEvent { let section = if let Some(id) = self.task { EventSection::Task(id) + } else if let Some(post) = self.stream.strip_prefix("post.") { + EventSection::Post( + post.split_once('.') + .map(|(name, _)| name) + .unwrap_or(post) + .into(), + ) } else { EventSection::None }; @@ -164,6 +174,7 @@ pub struct EventField { enum EventSection { None, Task(u32), + Post(String), } impl Serialize for EventSection { @@ -171,6 +182,7 @@ impl Serialize for EventSection { match self { EventSection::None => s.serialize_str("none"), EventSection::Task(idx) => s.serialize_str(&format!("task:{idx}")), + EventSection::Post(n) => s.serialize_str(&format!("post:{n}")), } } } diff --git a/variety/basic/www/style.css b/variety/basic/www/style.css index 469f841c..c64bd5f5 100644 --- a/variety/basic/www/style.css +++ b/variety/basic/www/style.css @@ -76,6 +76,18 @@ tr.s_default { background-color: #dddddd; } +tr.s_post { + background-color: #ffcceb; +} + +tr.s_post_stdout { + background-color: #fff0f9; +} + +tr.s_post_stderr { + background-color: #ffe5f5; +} + span.header { white-space: pre; font-family: monospace;
SEQ