diff --git a/.editorconfig b/.editorconfig index b7215c9b..8c42fff1 100644 --- a/.editorconfig +++ b/.editorconfig @@ -2,3 +2,6 @@ root = true [*] max_line_length = 80 + +[*.js] +indent_style = tab diff --git a/Cargo.toml b/Cargo.toml index e46049a2..35bd8401 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ version = "0.0.0" [workspace.lints.clippy] identity_op = "allow" many_single_char_names = "allow" +should_implement_trait = "allow" too_many_arguments = "allow" type_complexity = "allow" vec_init_then_push = "allow" diff --git a/agent/src/control/mod.rs b/agent/src/control/mod.rs index 821fdbe3..1a95153f 100644 --- a/agent/src/control/mod.rs +++ b/agent/src/control/mod.rs @@ -14,7 +14,7 @@ use tokio::{ }; use protocol::{ - Decoder, FactoryInfo, Message, Payload, PayloadReq, PayloadRes, + Decoder, FactoryInfo, Message, Payload, PayloadReq, PayloadRes, Process, }; pub(crate) mod protocol; @@ -123,6 +123,7 @@ pub async fn main() -> Result<()> { l.cmd("store", "access the job store", cmd!(cmd_store))?; l.cmd("address", "manage IP addresses for this job", cmd!(cmd_address))?; l.cmd("process", "manage background processes", cmd!(cmd_process))?; + l.cmd("post", "manage post tasks", cmd!(cmd_post))?; l.cmd("factory", "factory information for this worker", cmd!(cmd_factory))?; l.hcmd("eng", "for working on and testing buildomat", cmd!(cmd_eng))?; @@ -374,6 +375,62 @@ async fn cmd_store_put(mut l: Level) -> Result<()> { } } +async fn cmd_post(mut l: Level) -> Result<()> { + l.context_mut().connect().await?; + + l.cmd( + "success", + "add a post task running on success", + cmd!(cmd_post_success), + )?; + l.cmd( + "failure", + "add a post task running on failure", + cmd!(cmd_post_failure), + )?; + l.cmd("always", "add a post task that always runs", cmd!(cmd_post_always))?; + + sel!(l).run().await +} + +async fn cmd_post_success(l: Level) -> Result<()> { + post_inner(l, &[PayloadReq::PostSuccess]).await +} + +async fn cmd_post_failure(l: Level) -> Result<()> { + post_inner(l, &[PayloadReq::PostFailure]).await +} + +async fn cmd_post_always(l: Level) -> Result<()> { + post_inner(l, &[PayloadReq::PostSuccess, PayloadReq::PostFailure]).await +} + +async fn post_inner( + mut l: Level, + payloads: &[fn(Process) -> PayloadReq], +) -> Result<()> { + l.usage_args(Some("NAME COMMAND [ARGS...]")); + + let a = args!(l); + + for payload in payloads { + let req = payload(build_process(&l, a.args())?); + match l.context_mut().req(req).await? { + PayloadRes::Error(e) => { + /* + * This request is purely local to the agent, so an + * error is not something we should retry indefinitely. + */ + bail!("could not enqueue post task {:?}: {e}", a.args()[0]); + } + PayloadRes::Ack => {} + other => bail!("unexpected response: {other:?}"), + } + } + + Ok(()) +} + async fn cmd_process(mut l: Level) -> Result<()> { l.context_mut().connect().await?; @@ -387,14 +444,29 @@ async fn cmd_process_start(mut l: Level) -> Result<()> { let a = args!(l); - if a.args().len() < 2 { + let req = PayloadReq::ProcessStart(build_process(&l, a.args())?); + match l.context_mut().req(req).await? { + PayloadRes::Error(e) => { + /* + * This request is purely local to the agent, so an + * error is not something we should retry indefinitely. + */ + bail!("could not start process: {e}"); + } + PayloadRes::Ack => Ok(()), + other => bail!("unexpected response: {other:?}"), + } +} + +fn build_process(l: &Level, args: &[String]) -> Result { + if args.len() < 2 { bad_args!(l, "specify at least a process name and a command to run"); } - let payload = PayloadReq::ProcessStart { - name: a.args()[0].to_string(), - cmd: a.args()[1].to_string(), - args: a.args().iter().skip(2).cloned().collect::>(), + Ok(Process { + name: args[0].clone(), + cmd: args[1].clone(), + args: args[2..].to_vec(), /* * The process will actually be spawned by the agent, which is @@ -403,24 +475,11 @@ async fn cmd_process_start(mut l: Level) -> Result<()> { * process can be started as if it were run from the job program * itself. */ - env: std::env::vars_os().collect::>(), - pwd: std::env::current_dir()?.to_str().unwrap().to_string(), - + env: std::env::vars_os().collect(), + pwd: std::env::current_dir()?.into_os_string(), uid: unsafe { libc::geteuid() }, gid: unsafe { libc::getegid() }, - }; - - match l.context_mut().req(payload).await? { - PayloadRes::Error(e) => { - /* - * This request is purely local to the agent, so an - * error is not something we should retry indefinitely. - */ - bail!("could not start process: {e}"); - } - PayloadRes::Ack => Ok(()), - other => bail!("unexpected response: {other:?}"), - } + }) } async fn factory_info(s: &mut Stuff) -> Result { diff --git a/agent/src/control/protocol.rs b/agent/src/control/protocol.rs index 2b0a3fcd..a13e901a 100644 --- a/agent/src/control/protocol.rs +++ b/agent/src/control/protocol.rs @@ -23,20 +23,43 @@ pub struct StoreEntry { pub secret: bool, } +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct Process { + pub name: String, + pub cmd: String, + pub args: Vec, + pub env: Vec<(OsString, OsString)>, + pub pwd: OsString, + pub uid: u32, + pub gid: u32, +} + +impl Process { + pub fn validate(&self) -> Result<(), String> { + let name = &self.name; + + if name.len() > 32 { + return Err(format!("process name {name:?} is longer than 32")); + } + if let Some(c) = name + .chars() + .find(|&c| !c.is_ascii_alphanumeric() && c != '-' && c != '_') + { + return Err(format!("invalid char {c:?} in process name {name:?}")); + } + + Ok(()) + } +} + #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] pub enum PayloadReq { StoreGet(String), StorePut(String, String, bool), MetadataAddresses, - ProcessStart { - name: String, - cmd: String, - args: Vec, - env: Vec<(OsString, OsString)>, - pwd: String, - uid: u32, - gid: u32, - }, + ProcessStart(Process), + PostSuccess(Process), + PostFailure(Process), FactoryInfo, } diff --git a/agent/src/exec.rs b/agent/src/exec.rs index d8e2aa5a..ef520bd9 100644 --- a/agent/src/exec.rs +++ b/agent/src/exec.rs @@ -3,7 +3,6 @@ */ use std::collections::HashMap; -use std::ffi::OsString; use std::io::{BufRead, BufReader, Read}; use std::os::unix::process::{CommandExt, ExitStatusExt}; use std::process::{Command, Stdio}; @@ -11,13 +10,15 @@ use std::time::{Duration, Instant}; use tokio::sync::mpsc::{channel, Receiver, Sender}; use anyhow::{anyhow, bail, Result}; +use buildomat_common::JobStream; use chrono::prelude::*; use super::OutputRecord; +use crate::control::protocol::Process; fn spawn_reader( tx: Sender, - name: String, + name: JobStream, stream: Option, ) -> Option> where @@ -47,7 +48,10 @@ where let s = String::from_utf8_lossy(&buf); if tx - .blocking_send(Activity::msg(&name, s.trim_end())) + .blocking_send(Activity::msg( + name.clone(), + s.trim_end(), + )) .is_err() { /* @@ -63,7 +67,7 @@ where * server, but don't panic if we cannot. */ tx.blocking_send(Activity::msg( - "error", + JobStream::Error, &format!("failed to read {name}: {e:?}"), )) .ok(); @@ -76,7 +80,7 @@ where #[derive(Debug)] pub struct ExitDetails { - stream: String, + stream: JobStream, duration_ms: u64, when: DateTime, code: i32, @@ -85,7 +89,7 @@ pub struct ExitDetails { impl ExitDetails { pub(crate) fn to_record(&self) -> OutputRecord { OutputRecord { - stream: self.stream.to_string(), + stream: self.stream.clone(), msg: format!( "process exited: duration {} ms, exit code {}", self.duration_ms, self.code @@ -101,7 +105,7 @@ impl ExitDetails { #[derive(Clone, Debug)] pub struct OutputDetails { - stream: String, + stream: JobStream, msg: String, time: DateTime, } @@ -109,7 +113,7 @@ pub struct OutputDetails { impl OutputDetails { pub(crate) fn to_record(&self) -> OutputRecord { OutputRecord { - stream: self.stream.to_string(), + stream: self.stream.clone(), msg: self.msg.to_string(), time: self.time, } @@ -123,51 +127,82 @@ pub enum Activity { Complete, } -struct ActivityBuilder { - error_stream: String, - exit_stream: String, - bgproc: Option, +#[derive(Clone)] +pub enum ActivityBuilder { + Task(u32), + Diag(String), + Bg(String), + Post(String), } impl ActivityBuilder { - fn exit(&self, start: &Instant, end: &Instant, code: i32) -> Activity { - Activity::Exit(ExitDetails { - stream: self.exit_stream.to_string(), - duration_ms: end.duration_since(*start).as_millis() as u64, - when: Utc::now(), - code, - }) + fn stdout_stream(&self) -> JobStream { + match self.clone() { + ActivityBuilder::Task(_) => JobStream::Stdout, + ActivityBuilder::Diag(_) => JobStream::Stdout, + ActivityBuilder::Bg(name) => JobStream::BgStdout { name }, + ActivityBuilder::Post(name) => JobStream::PostStdout { name }, + } + } + + fn stderr_stream(&self) -> JobStream { + match self.clone() { + ActivityBuilder::Task(_) => JobStream::Stderr, + ActivityBuilder::Diag(_) => JobStream::Stderr, + ActivityBuilder::Bg(name) => JobStream::BgStderr { name }, + ActivityBuilder::Post(name) => JobStream::PostStderr { name }, + } } - fn stdout_stream(&self) -> String { - if let Some(n) = &self.bgproc { - format!("bg.{n}.stdout") - } else { - "stdout".to_string() + fn exit_stream(&self) -> JobStream { + match self.clone() { + ActivityBuilder::Task(_) => JobStream::Task, + ActivityBuilder::Diag(name) => JobStream::Diag { name }, + ActivityBuilder::Bg(name) => JobStream::Bg { name }, + ActivityBuilder::Post(name) => JobStream::Post { name }, } } - fn stderr_stream(&self) -> String { - if let Some(n) = &self.bgproc { - format!("bg.{n}.stderr") - } else { - "stderr".to_string() + fn error_stream(&self) -> JobStream { + match self.clone() { + ActivityBuilder::Task(_) => JobStream::Worker, + ActivityBuilder::Diag(name) => JobStream::Diag { name }, + ActivityBuilder::Bg(name) => JobStream::Bg { name }, + ActivityBuilder::Post(name) => JobStream::Post { name }, } } + fn is_bg(&self) -> bool { + matches!(self, ActivityBuilder::Bg(_)) + } + fn errmsg(&self, pfx: &str, msg: &str) -> String { let mut s = format!("{pfx}: "); - if let Some(bg) = &self.bgproc { - s += &format!("background process {bg:?}: "); + match self { + ActivityBuilder::Task(_) => {} + ActivityBuilder::Diag(_) => {} + ActivityBuilder::Post(_) => {} + ActivityBuilder::Bg(name) => { + s += &format!("background process {name:?}: ") + } } s += ": "; s += msg; s } + fn exit(&self, start: &Instant, end: &Instant, code: i32) -> Activity { + Activity::Exit(ExitDetails { + stream: self.exit_stream(), + duration_ms: end.duration_since(*start).as_millis() as u64, + when: Utc::now(), + code, + }) + } + fn err(&self, msg: &str) -> Activity { Activity::Output(OutputDetails { - stream: self.error_stream.to_string(), + stream: self.error_stream(), msg: self.errmsg("exec error", msg), time: Utc::now(), }) @@ -175,7 +210,7 @@ impl ActivityBuilder { fn warn(&self, msg: &str) -> Activity { Activity::Output(OutputDetails { - stream: self.error_stream.to_string(), + stream: self.error_stream(), msg: self.errmsg("exec warning", msg), time: Utc::now(), }) @@ -183,9 +218,9 @@ impl ActivityBuilder { } impl Activity { - fn msg(stream: &str, msg: &str) -> Activity { + fn msg(stream: JobStream, msg: &str) -> Activity { Activity::Output(OutputDetails { - stream: stream.to_string(), + stream, msg: msg.to_string(), time: Utc::now(), }) @@ -229,39 +264,13 @@ pub fn thread_done( } } -pub fn run_diagnostic(cmd: Command, name: &str) -> Result> { +pub fn run(cmd: Command, ab: ActivityBuilder) -> Result> { let (tx, rx) = channel::(100); - - run_common( - cmd, - ActivityBuilder { - error_stream: format!("diag.{name}"), - exit_stream: format!("diag.{name}"), - bgproc: None, - }, - tx, - )?; - - Ok(rx) -} - -pub fn run(cmd: Command) -> Result> { - let (tx, rx) = channel::(100); - - run_common( - cmd, - ActivityBuilder { - error_stream: "worker".to_string(), - exit_stream: "task".to_string(), - bgproc: None, - }, - tx, - )?; - + run_inner(cmd, ab, tx)?; Ok(rx) } -fn run_common( +fn run_inner( mut cmd: Command, ab: ActivityBuilder, tx: Sender, @@ -289,14 +298,15 @@ fn run_common( ) .unwrap(); - /* - * Only send an exit notification if this is the primary task - * process. - */ - if ab.bgproc.is_none() { - tx.blocking_send(ab.exit(&start, &end, i32::MAX)).unwrap(); + if ab.is_bg() { + /* + * No further notifications are required for background + * processes. + */ + return; } + tx.blocking_send(ab.exit(&start, &end, i32::MAX)).unwrap(); false } Ok(es) => { @@ -312,11 +322,12 @@ fn run_common( let stdio_warning = !thread_done(&mut readout, "stdout", until) | !thread_done(&mut readerr, "stderr", until); - if ab.bgproc.is_some() { + if ab.is_bg() { /* * No further notifications are required for background * processes. */ + return; } if let Some(sig) = es.signal() { @@ -331,7 +342,7 @@ fn run_common( } }; - assert!(ab.bgproc.is_none()); + assert!(!ab.is_bg()); if stdio_warning { tx.blocking_send(ab.warn( @@ -381,23 +392,11 @@ impl BackgroundProcesses { BackgroundProcesses { rx, tx, procs: Default::default() } } - pub fn start<'a, A, E>( - &mut self, - name: &str, - cmd: &str, - args: A, - env: E, - pwd: &str, - uid: u32, - gid: u32, - ) -> Result - where - A: IntoIterator, - E: IntoIterator, - { + pub fn start(&mut self, process: &Process) -> Result { /* * Process name must be unique within the task. */ + let name = &process.name; if self.procs.contains_key(name) { bail!("background process {name:?} is already running"); } @@ -418,7 +417,7 @@ impl BackgroundProcesses { * child terminates, tearing down the rest of the children: */ c.arg("-l").arg("child"); - c.arg(cmd); + c.arg(&process.cmd); c }; @@ -428,10 +427,10 @@ impl BackgroundProcesses { * Regrettably other operating systems do not have contracts. For * now, just start the program. */ - Command::new(cmd) + Command::new(&process.cmd) }; - for a in args { + for a in &process.args { c.arg(a); } @@ -439,21 +438,17 @@ impl BackgroundProcesses { * Use the environment, working directory, and credentials passed to us * by the control program, not our own: */ - c.current_dir(pwd); + c.current_dir(&process.pwd); c.env_clear(); - for (k, v) in env { + for (k, v) in &process.env { c.env(k, v); } - c.uid(uid); - c.gid(gid); + c.uid(process.uid); + c.gid(process.gid); - let pid = run_common( + let pid = run_inner( c, - ActivityBuilder { - error_stream: format!("bg.{name}"), - exit_stream: format!("bg.{name}"), - bgproc: Some(name.to_string()), - }, + ActivityBuilder::Bg(name.to_string()), self.tx.clone(), ) .map_err(|e| anyhow!("starting background process {name:?}: {e}"))?; @@ -520,8 +515,7 @@ impl BackgroundProcesses { self.rx.close(); while let Some(a) = self.rx.recv().await { if let Activity::Output(o) = &a { - if o.stream.ends_with("stdout") || o.stream.ends_with("stderr") - { + if o.stream.is_output() { lastwords.push(a); } } diff --git a/agent/src/main.rs b/agent/src/main.rs index 3d677fc7..1a88e769 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -36,6 +36,7 @@ mod shadow; mod upload; use control::protocol::{FactoryInfo, PayloadReq, PayloadRes}; +use exec::ActivityBuilder; struct Agent { log: Logger, @@ -59,7 +60,9 @@ mod os_constants { } use os_constants::*; -use crate::control::protocol::StoreEntry; +use crate::control::protocol::{Process, StoreEntry}; + +const QUOTA_POST_TASKS_PER_TYPE: usize = 32; #[derive(Serialize, Deserialize)] struct ConfigFile { @@ -92,18 +95,14 @@ impl ConfigFile { } struct OutputRecord { - stream: String, + stream: JobStream, time: DateTime, msg: String, } impl OutputRecord { - fn new(stream: &str, msg: &str) -> OutputRecord { - OutputRecord { - stream: stream.to_string(), - time: Utc::now(), - msg: msg.to_string(), - } + fn new(stream: JobStream, msg: &str) -> OutputRecord { + OutputRecord { stream, time: Utc::now(), msg: msg.to_string() } } } @@ -154,13 +153,13 @@ async fn append_job_worker( events.push(match ae { AppendJobEntry::JobEvent(rec) => WorkerAppendJobOrTask { task: None, - stream: rec.stream, + stream: rec.stream.to_string(), payload: rec.msg, time: rec.time, }, AppendJobEntry::TaskEvent(task, rec) => WorkerAppendJobOrTask { task: Some(task), - stream: rec.stream, + stream: rec.stream.to_string(), payload: rec.msg, time: rec.time, }, @@ -242,7 +241,7 @@ async fn append_worker_worker( events.push(match ae { AppendWorkerEntry::WorkerEvent(rec) => WorkerAppend { - stream: rec.stream, + stream: rec.stream.to_string(), payload: rec.msg, time: rec.time, }, @@ -305,8 +304,11 @@ impl ClientWrap { } async fn append_worker_msg(&self, name: &str, msg: &str) { - self.append_worker(OutputRecord::new(&format!("diag.{name}"), msg)) - .await + self.append_worker(OutputRecord::new( + JobStream::Diag { name: name.into() }, + msg, + )) + .await } async fn append_worker(&self, rec: OutputRecord) { @@ -331,20 +333,31 @@ impl ClientWrap { } async fn append_msg(&self, msg: &str) { - self.append(OutputRecord::new("worker", msg)).await; + self.append(OutputRecord::new(JobStream::Worker, msg)).await; } - async fn append_task(&self, task: &WorkerPingTask, rec: OutputRecord) { + async fn append_activity(&self, ab: &ActivityBuilder, rec: OutputRecord) { self.tx .as_ref() .unwrap() - .send(AppendJobEntry::TaskEvent(task.id, rec)) + .send(match ab { + ActivityBuilder::Task(id) => { + AppendJobEntry::TaskEvent(*id, rec) + } + ActivityBuilder::Diag(_) + | ActivityBuilder::Bg(_) + | ActivityBuilder::Post(_) => AppendJobEntry::JobEvent(rec), + }) .await .unwrap(); } async fn append_task_msg(&self, task: &WorkerPingTask, msg: &str) { - self.append_task(task, OutputRecord::new("task", msg)).await; + self.append_activity( + &ActivityBuilder::Task(task.id), + OutputRecord::new(JobStream::Task, msg), + ) + .await; } async fn flush_job_barrier(&self) { @@ -360,7 +373,7 @@ impl ClientWrap { rx.await.unwrap(); } - async fn task_complete(&self, task: &WorkerPingTask, failed: bool) { + async fn task_complete(&self, task_id: u32, failed: bool) { /* * Make sure any previously enqueued event log events have gone out to * the server before we complete the task. @@ -372,7 +385,7 @@ impl ClientWrap { .client .worker_task_complete() .job(self.job_id().unwrap()) - .task(task.id) + .task(task_id) .body_map(|body| body.failed(failed)) .send() .await @@ -656,7 +669,7 @@ impl ClientWrap { cmd.uid(0); cmd.gid(0); - match exec::run_diagnostic(cmd, name) { + match exec::run(cmd, ActivityBuilder::Diag(name.into())) { Ok(c) => Ok(c), Err(e) => { /* @@ -666,7 +679,7 @@ impl ClientWrap { self.client .worker_append() .body(vec![WorkerAppend { - stream: "agent".into(), + stream: JobStream::Agent.to_string(), time: Utc::now(), payload: format!("ERROR: diag.{name} exec: {e:?}"), }]) @@ -1009,7 +1022,13 @@ enum Stage { Ready, Download(mpsc::Receiver), NextTask, - Child(mpsc::Receiver, WorkerPingTask, Option), + NextPostTask(PostQueue), + Child( + mpsc::Receiver, + ActivityBuilder, + Option, + NextStage, + ), Upload(mpsc::Receiver), StartPreDiagnostics(String), PreDiagnostics(mpsc::Receiver, Option), @@ -1019,6 +1038,22 @@ enum Stage { Broken, } +/* + * Unfortunately we can't add the next stage as a filed of the stage: due to how + * stages are used we wouldn't be able to unbox it if we were to add Box. + */ +#[derive(Clone, Copy)] +enum NextStage { + NextTask, + NextPostTask(PostQueue), +} + +#[derive(Clone, Copy)] +enum PostQueue { + Success, + Failure, +} + async fn cmd_install(mut l: Level) -> Result<()> { l.usage_args(Some("BASEURL BOOTSTRAP_TOKEN")); l.optopt("N", "", "set nodename of machine", "NODENAME"); @@ -1214,6 +1249,8 @@ async fn cmd_run(mut l: Level) -> Result<()> { let mut control = control::server::listen()?; let mut creq: Option = None; let mut bgprocs = exec::BackgroundProcesses::new(); + let mut post_tasks_success = VecDeque::new(); + let mut post_tasks_failure = VecDeque::new(); let mut metadata: Option = None; let mut factory: Option = None; @@ -1438,20 +1475,22 @@ async fn cmd_run(mut l: Level) -> Result<()> { .map(|md| md.addresses().to_vec()) .unwrap_or_default(), ), - PayloadReq::ProcessStart { - name, - cmd, - args, - env, - pwd, - uid, - gid, - } => { - match bgprocs.start(name, cmd, args, env, pwd, *uid, *gid) { - Ok(_) => PayloadRes::Ack, - Err(e) => PayloadRes::Error(e.to_string()), + PayloadReq::ProcessStart(process) => { + if let Err(err) = process.validate() { + PayloadRes::Error(err) + } else { + match bgprocs.start(process) { + Ok(_) => PayloadRes::Ack, + Err(e) => PayloadRes::Error(e.to_string()), + } } } + PayloadReq::PostSuccess(process) => { + add_post_task(&mut post_tasks_success, process) + } + PayloadReq::PostFailure(process) => { + add_post_task(&mut post_tasks_failure, process) + } PayloadReq::FactoryInfo => { if let Some(f) = &factory { PayloadRes::FactoryInfo(FactoryInfo { @@ -1558,7 +1597,7 @@ async fn cmd_run(mut l: Level) -> Result<()> { if job_failed || tasks.is_empty() { /* * There are no more tasks to complete, so move on to - * uploading outputs. + * running post tasks. */ info!( log, @@ -1566,10 +1605,11 @@ async fn cmd_run(mut l: Level) -> Result<()> { cw.job_id().unwrap(), ); - stage = Stage::Upload(upload::upload( - cw.clone(), - cw.job.as_ref().unwrap().output_rules.clone(), - )); + stage = Stage::NextPostTask(if job_failed { + PostQueue::Failure + } else { + PostQueue::Success + }); continue; } @@ -1633,9 +1673,10 @@ async fn cmd_run(mut l: Level) -> Result<()> { cmd.uid(t.uid); cmd.gid(t.gid); - match exec::run(cmd) { + let ab = ActivityBuilder::Task(t.id); + match exec::run(cmd, ab.clone()) { Ok(c) => { - stage = Stage::Child(c, t, None); + stage = Stage::Child(c, ab, None, NextStage::NextTask); } Err(e) => { job_failed = true; @@ -1649,7 +1690,7 @@ async fn cmd_run(mut l: Level) -> Result<()> { .job(cw.job_id().unwrap()) .body(vec![WorkerAppendJobOrTask { task: None, - stream: "agent".into(), + stream: JobStream::Agent.to_string(), time: Utc::now(), payload: format!("ERROR: exec: {:?}", e), }]) @@ -1659,7 +1700,89 @@ async fn cmd_run(mut l: Level) -> Result<()> { } } } - Stage::Child(ch, t, failed) => { + Stage::NextPostTask(queue) => { + let post_tasks = match queue { + PostQueue::Success => &mut post_tasks_success, + PostQueue::Failure => &mut post_tasks_failure, + }; + + if post_tasks.is_empty() { + /* + * There are no more tasks to complete, so move on to + * uploading outputs. + */ + info!( + log, + "no more post tasks for job {}", + cw.job_id().unwrap() + ); + + stage = Stage::Upload(upload::upload( + cw.clone(), + cw.job.as_ref().unwrap().output_rules.clone(), + )); + continue; + } + + let t = post_tasks.pop_front().unwrap(); + + /* + * Emit an event that we can use to visually separate tasks + * in the output. + */ + let msg = format!("starting post task: \"{}\"", t.name); + cw.append(OutputRecord::new( + JobStream::Post { name: t.name.clone() }, + &msg, + )) + .await; + + let mut cmd = Command::new(t.cmd); + cmd.current_dir(&t.pwd); + cmd.args(&t.args); + cmd.env_clear(); + for (k, v) in &t.env { + cmd.env(k, v); + } + cmd.uid(t.uid); + cmd.gid(t.gid); + + let ab = ActivityBuilder::Post(t.name.clone()); + match exec::run(cmd, ab.clone()) { + Ok(c) => { + stage = Stage::Child( + c, + ab, + None, + NextStage::NextPostTask(*queue), + ); + } + Err(e) => { + job_failed = true; + + /* + * Try to post the error we would have reported + * to the server, but don't try too hard. + */ + cw.client + .worker_job_append() + .job(cw.job_id().unwrap()) + .body(vec![WorkerAppendJobOrTask { + task: None, + stream: JobStream::Post { + name: t.name.clone(), + } + .to_string(), + time: Utc::now(), + payload: format!("ERROR: exec: {:?}", e), + }]) + .send() + .await + .ok(); + } + } + } + Stage::Child(ch, ab, failed, next_stage) => { let a = tokio::select! { _ = pingfreq.tick() => { do_ping = true; @@ -1675,10 +1798,10 @@ async fn cmd_run(mut l: Level) -> Result<()> { match a { Some(exec::Activity::Output(o)) => { - cw.append_task(t, o.to_record()).await; + cw.append_activity(ab, o.to_record()).await; } Some(exec::Activity::Exit(ex)) => { - cw.append_task(t, ex.to_record()).await; + cw.append_activity(ab, ex.to_record()).await; /* * Preserve the exit status for when we record task @@ -1697,21 +1820,28 @@ async fn cmd_run(mut l: Level) -> Result<()> { */ for a in bgprocs.killall().await { if let exec::Activity::Output(o) = a { - cw.append_task(t, o.to_record()).await; + cw.append_activity(ab, o.to_record()).await; } } /* * Record completion of this task within the job. */ - cw.task_complete(t, failed.unwrap()).await; + if let ActivityBuilder::Task(id) = ab { + cw.task_complete(*id, failed.unwrap()).await; + } - stage = Stage::NextTask; + stage = match next_stage { + NextStage::NextTask => Stage::NextTask, + NextStage::NextPostTask(queue) => { + Stage::NextPostTask(*queue) + } + }; } None => { for a in bgprocs.killall().await { if let exec::Activity::Output(o) = a { - cw.append_task(t, o.to_record()).await; + cw.append_activity(ab, o.to_record()).await; } } @@ -1868,6 +1998,28 @@ async fn cmd_run(mut l: Level) -> Result<()> { } } +fn add_post_task(queue: &mut VecDeque, task: &Process) -> PayloadRes { + let name = &task.name; + + if queue.len() >= QUOTA_POST_TASKS_PER_TYPE { + return PayloadRes::Error(format!( + "cannot register more than {QUOTA_POST_TASKS_PER_TYPE} post tasks" + )); + } + if queue.iter().any(|p| p.name == task.name) { + return PayloadRes::Error(format!( + "a post task named {name:?} already exists", + )); + } + + if let Err(err) = task.validate() { + return PayloadRes::Error(err); + } + + queue.push_back(task.clone()); + PayloadRes::Ack +} + #[tokio::main] async fn main() -> Result<()> { let cmdname = std::env::args().next().as_deref().and_then(|s| { diff --git a/bin/src/main.rs b/bin/src/main.rs index 96f0fd49..4a3a71f3 100644 --- a/bin/src/main.rs +++ b/bin/src/main.rs @@ -644,33 +644,29 @@ async fn poll_job(l: &Level, id: &str, json: bool) -> Result<()> { loop { match wat.recv().await { Some(Ok(buildomat_client::EventOrState::Event(e))) => { - if json { - println!("{}", serde_json::to_string(&e)?); - } else if e.stream == "stdout" || e.stream == "stderr" { - println!("{}", e.payload); - } else if e.stream == "control" { - println!("|=| {}", e.payload); - } else if e.stream == "worker" { - println!("|W| {}", e.payload); - } else if e.stream == "task" { - println!("|T| {}", e.payload); - } else if e.stream == "console" { - println!("|C| {}", e.payload); - } else if e.stream == "panic" { - println!("|!| {}", e.payload); - } else if e.stream.starts_with("bg.") { - let t = e.stream.split('.').collect::>(); - if t.len() == 3 { - if t[2] == "stdout" || t[2] == "stderr" { - println!("[{}] {}", t[1], e.payload); - } else { - println!("{:?}", e); - } - } else { - println!("{:?}", e); + let stream = JobStream::from_str(&e.stream); + let p = &e.payload; + match stream { + _ if json => println!("{}", serde_json::to_string(&e)?), + JobStream::BgStderr { name } => println!("[{name}] {p}"), + JobStream::BgStdout { name } => println!("[{name}] {p}"), + JobStream::Console => println!("|C| {p}"), + JobStream::Control => println!("|=| {p}"), + JobStream::Panic => println!("|!| {p}"), + JobStream::Post { .. } => println!("|P| {p}"), + JobStream::Task => println!("|T| {p}"), + JobStream::Worker => println!("|W| {p}"), + JobStream::Stderr + | JobStream::Stdout + | JobStream::PostStderr { .. } + | JobStream::PostStdout { .. } => { + println!("{p}") } - } else { - println!("{:?}", e); + JobStream::Agent + | JobStream::Bg { .. } + | JobStream::Diag { .. } + | JobStream::Error + | JobStream::Unknown(_) => println!("{e:?}"), } } Some(Ok(buildomat_client::EventOrState::State(st))) => { diff --git a/common/src/job_streams.rs b/common/src/job_streams.rs new file mode 100644 index 00000000..f2bbbf08 --- /dev/null +++ b/common/src/job_streams.rs @@ -0,0 +1,103 @@ +/* + * Copyright 2026 Oxide Computer Company + */ + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum JobStream { + Agent, + Bg { name: String }, + BgStderr { name: String }, + BgStdout { name: String }, + Console, + Control, + Diag { name: String }, + Error, + Panic, + Post { name: String }, + PostStderr { name: String }, + PostStdout { name: String }, + Stderr, + Stdout, + Task, + Worker, + Unknown(String), +} + +impl JobStream { + pub fn from_str(stream: &str) -> JobStream { + let parts = stream.split('.').collect::>(); + match parts.as_slice() { + ["agent"] => JobStream::Agent, + ["bg", name] => JobStream::Bg { name: name.to_string() }, + ["bg", name, "stdout"] => { + JobStream::BgStdout { name: name.to_string() } + } + ["bg", name, "stderr"] => { + JobStream::BgStderr { name: name.to_string() } + } + ["console"] => JobStream::Console, + ["control"] => JobStream::Control, + ["diag", name] => JobStream::Diag { name: name.to_string() }, + ["error"] => JobStream::Error, + ["panic"] => JobStream::Panic, + ["post", name] => JobStream::Post { name: name.to_string() }, + ["post", name, "stderr"] => { + JobStream::PostStderr { name: name.to_string() } + } + ["post", name, "stdout"] => { + JobStream::PostStdout { name: name.to_string() } + } + ["stderr"] => JobStream::Stderr, + ["stdout"] => JobStream::Stdout, + ["task"] => JobStream::Task, + ["worker"] => JobStream::Worker, + _ => JobStream::Unknown(stream.to_string()), + } + } + + pub fn is_output(&self) -> bool { + match self { + JobStream::BgStderr { .. } + | JobStream::BgStdout { .. } + | JobStream::PostStderr { .. } + | JobStream::PostStdout { .. } + | JobStream::Stderr + | JobStream::Stdout => true, + JobStream::Agent + | JobStream::Bg { .. } + | JobStream::Console + | JobStream::Control + | JobStream::Diag { .. } + | JobStream::Error + | JobStream::Panic + | JobStream::Post { .. } + | JobStream::Task + | JobStream::Worker + | JobStream::Unknown(_) => false, + } + } +} + +impl std::fmt::Display for JobStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + JobStream::Agent => write!(f, "agent"), + JobStream::Bg { name } => write!(f, "bg.{name}"), + JobStream::BgStderr { name } => write!(f, "bg.{name}.stderr"), + JobStream::BgStdout { name } => write!(f, "bg.{name}.stdout"), + JobStream::Console => f.write_str("console"), + JobStream::Control => f.write_str("control"), + JobStream::Diag { name } => write!(f, "diag.{name}"), + JobStream::Error => f.write_str("error"), + JobStream::Panic => f.write_str("panic"), + JobStream::Post { name } => write!(f, "post.{name}"), + JobStream::PostStderr { name } => write!(f, "post.{name}.stderr"), + JobStream::PostStdout { name } => write!(f, "post.{name}.stdout"), + JobStream::Stderr => f.write_str("stderr"), + JobStream::Stdout => f.write_str("stdout"), + JobStream::Task => f.write_str("task"), + JobStream::Worker => f.write_str("worker"), + JobStream::Unknown(s) => f.write_str(s), + } + } +} diff --git a/common/src/lib.rs b/common/src/lib.rs index 88ea9656..dda5f806 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -2,6 +2,8 @@ * Copyright 2025 Oxide Computer Company */ +mod job_streams; + use std::io::{IsTerminal, Read}; use std::path::Path; use std::sync::{Mutex, OnceLock}; @@ -16,6 +18,8 @@ use rusty_ulid::Ulid; use serde::{Deserialize, Serialize}; use slog::{o, Drain, Logger}; +pub use job_streams::JobStream; + pub fn read_toml, T>(n: P) -> Result where for<'de> T: Deserialize<'de>, diff --git a/github/server/src/variety/basic.rs b/github/server/src/variety/basic.rs index 1a24a35c..acea5c06 100644 --- a/github/server/src/variety/basic.rs +++ b/github/server/src/variety/basic.rs @@ -456,74 +456,46 @@ pub(crate) async fn run( p.event_minseq = ev.seq + 1; } - let stdio = ev.stream == "stdout" || ev.stream == "stderr"; - let console = ev.stream == "console"; - let panic = ev.stream == "panic"; - let worker = ev.stream == "worker"; - let bgproc = ev.stream.starts_with("bg."); - - if stdio || console || panic { - /* - * Some commands, like "cargo build --verbose", generate - * exceptionally long output lines, running into the - * thousands of characters. The long lines present two - * challenges: they are not readily visible without - * horizontal scrolling in the GitHub UI; the maximum status - * message length GitHub will accept is 64KB, and even a - * small number of long lines means our status update will - * not be accepted. - * - * If a line is longer than 90 characters, truncate it. - * Users will still be able to see the full output in our - * detailed view where we get to render the whole page. - */ - let mut line = if console { - "|C| " - } else if panic { - "|!|" - } else { - "| " + match JobStream::from_str(&ev.stream) { + JobStream::Stderr | JobStream::Stdout => { + let l = format!("| {}", truncate_line(&ev.payload)); + p.events_tail.push_back((None, l)); } - .to_string(); - - /* - * We support ANSI escapes in the log renderer, which means - * that tools will generate ANSI sequences. That doesn't - * work in the GitHub renderer, so we need to strip them out - * entirely. - */ - let payload = strip_ansi_escapes::strip_str(&ev.payload); - let mut chars = payload.chars(); - - for _ in 0..MAX_LINE_LENGTH { - if let Some(c) = chars.next() { - line.push(c); - } else { - break; - } + JobStream::Console => { + let l = format!("|C| {}", truncate_line(&ev.payload)); + p.events_tail.push_back((None, l)); } - if chars.next().is_some() { + JobStream::Panic => { + let l = format!("|!| {}", truncate_line(&ev.payload)); + p.events_tail.push_back((None, l)); + } + JobStream::Worker => { /* - * If any characters remain, the string was truncated. + * A job may produce a large number of files. We must + * not treat worker output (which is mostly about file + * uploads) as headers. They must be regular records + * that are discarded as they scroll off the top. */ - line.push_str(" [...]"); + let line = format!("|W| {}", ev.payload); + p.events_tail.push_back((None, line)); } - - p.events_tail.push_back((None, line)); - } else if worker { - /* - * A job may produce a large number of files. We must not - * treat worker output (which is mostly about file uploads - * and so on) as headers. They must be regular records that - * are discarded as they scroll off the top. - */ - let line = format!("|W| {}", ev.payload); - p.events_tail.push_back((None, line)); - } else if !bgproc { - p.events_tail.push_back(( - Some(format!("{}/{:?}", ev.stream, ev.task)), - format!("{}: {}", ev.stream, ev.payload), - )); + JobStream::Agent + | JobStream::Control + | JobStream::Diag { .. } + | JobStream::Error + | JobStream::Task + | JobStream::Unknown(_) => { + p.events_tail.push_back(( + Some(format!("{}/{:?}", ev.stream, ev.task)), + format!("{}: {}", ev.stream, ev.payload), + )); + } + JobStream::Bg { .. } + | JobStream::BgStderr { .. } + | JobStream::BgStdout { .. } + | JobStream::Post { .. } + | JobStream::PostStderr { .. } + | JobStream::PostStdout { .. } => {} } } @@ -837,6 +809,44 @@ pub(crate) async fn run( Ok(true) } +/* + * Some commands, like "cargo build --verbose", generate exceptionally long + * output lines, running into the thousands of characters. The long lines + * present two challenges: they are not readily visible without horizontal + * scrolling in the GitHub UI; the maximum status message length GitHub will + * accept is 64KB, and even a small number of long lines means our status + * update will not be accepted. + * + * If a line is longer than 90 characters, truncate it. Users will still be + * able to see the full output in our detailed view where we get to render the + * whole page. + */ +fn truncate_line(payload: &str) -> String { + /* + * We support ANSI escapes in the log renderer, which means that tools will + * generate ANSI sequences. That doesn't work in the GitHub renderer, so + * we need to strip them out entirely. + */ + let payload = strip_ansi_escapes::strip_str(payload); + let mut chars = payload.chars(); + + let mut line = String::new(); + for _ in 0..MAX_LINE_LENGTH { + if let Some(c) = chars.next() { + line.push(c); + } else { + break; + } + } + if chars.next().is_some() { + /* + * If any characters remain, the string was truncated. + */ + line.push_str(" [...]"); + } + line +} + async fn bunyan_to_html( f: &mut tokio::fs::File, dec: &mut buildomat_bunyan::BunyanDecoder, diff --git a/jobsh/src/lib.rs b/jobsh/src/lib.rs index 3989764a..51a579ea 100644 --- a/jobsh/src/lib.rs +++ b/jobsh/src/lib.rs @@ -5,25 +5,19 @@ use std::borrow::Cow; use buildomat_client::types::JobEvent; +use buildomat_common::JobStream; use chrono::SecondsFormat; -use serde::Serialize; +use serde::{Serialize, Serializer}; pub mod jobfile; pub mod variety; -/* - * Classes for these streams are defined in the "variety/basic/www/style.css", - * which we send along with the generated HTML output. - */ -const CSS_STREAM_CLASSES: &[&str] = - &["stdout", "stderr", "task", "worker", "control", "console", "panic"]; - pub trait JobEventEx { /** * Choose a colour (CSS class name) for the stream to which this event * belongs. */ - fn css_class(&self) -> String; + fn css_class(&self) -> &'static str; /** * Turn a job event into a somewhat abstract object with pre-formatted HTML @@ -34,15 +28,25 @@ pub trait JobEventEx { } impl JobEventEx for JobEvent { - fn css_class(&self) -> String { - let s = self.stream.as_str(); - - if CSS_STREAM_CLASSES.contains(&s) { - format!("s_{s}") - } else if s.starts_with("bg.") { - "s_bgtask".into() - } else { - "s_default".into() + fn css_class(&self) -> &'static str { + match JobStream::from_str(&self.stream) { + JobStream::Console => "s_console", + JobStream::Control => "s_control", + JobStream::Panic => "s_panic", + JobStream::Post { .. } => "s_post", + JobStream::PostStderr { .. } => "s_post_stderr", + JobStream::PostStdout { .. } => "s_post_stdout", + JobStream::Stderr => "s_stderr", + JobStream::Stdout => "s_stdout", + JobStream::Task => "s_task", + JobStream::Worker => "s_worker", + JobStream::Bg { .. } + | JobStream::BgStderr { .. } + | JobStream::BgStdout { .. } => "s_bgtask", + JobStream::Agent + | JobStream::Error + | JobStream::Diag { .. } + | JobStream::Unknown(_) => "s_default", } } @@ -67,9 +71,22 @@ impl JobEventEx for JobEvent { encode_payload(&self.payload), ); + let section = if let Some(id) = self.task { + EventSection::Task(id) + } else if let Some(post) = self.stream.strip_prefix("post.") { + EventSection::Post( + post.split_once('.') + .map(|(name, _)| name) + .unwrap_or(post) + .into(), + ) + } else { + EventSection::None + }; + EventRow { - task: self.task, css_class: self.css_class(), + section, fields: vec![ EventField { css_class: "num", @@ -136,8 +153,8 @@ fn encode_payload(payload: &str) -> Cow<'_, str> { #[derive(Debug, Serialize)] pub struct EventRow { - task: Option, - css_class: String, + css_class: &'static str, + section: EventSection, fields: Vec, } @@ -153,6 +170,23 @@ pub struct EventField { anchor: Option, } +#[derive(Debug, PartialEq, Eq)] +enum EventSection { + None, + Task(u32), + Post(String), +} + +impl Serialize for EventSection { + fn serialize(&self, s: S) -> Result { + match self { + EventSection::None => s.serialize_str("none"), + EventSection::Task(idx) => s.serialize_str(&format!("task:{idx}")), + EventSection::Post(n) => s.serialize_str(&format!("post:{n}")), + } + } +} + #[cfg(test)] pub mod test { use super::*; diff --git a/jobsh/src/variety/basic.rs b/jobsh/src/variety/basic.rs index a5c32e67..cf5d05ec 100644 --- a/jobsh/src/variety/basic.rs +++ b/jobsh/src/variety/basic.rs @@ -13,7 +13,7 @@ use futures::future::BoxFuture; use hyper::Response; use serde::{Deserialize, Serialize}; -use crate::JobEventEx; +use crate::{EventSection, JobEventEx}; /* * We can use "deny_unknown_fields" here because the global frontmatter fields @@ -56,7 +56,7 @@ pub async fn output_table( ) -> Result { let mut out = "\n".to_string(); - let mut last = None; + let mut last = EventSection::None; out += "\n"; out += "\n"; @@ -112,10 +112,10 @@ pub async fn output_table( let evr = ev.event_row(); /* - * If the task has changed, render a full-width blank row in the + * If the section has changed, render a full-width blank row in the * table: */ - if evr.task != last { + if evr.section != last { let cols = evr .fields .iter() @@ -124,7 +124,7 @@ pub async fn output_table( out += &format!(""); } - last = evr.task; + last = evr.section; out += &format!("\n", evr.css_class); diff --git a/variety/basic/www/live.js b/variety/basic/www/live.js index d717eba8..8375433a 100644 --- a/variety/basic/www/live.js +++ b/variety/basic/www/live.js @@ -4,7 +4,7 @@ var EVT; var LOCAL_TIME = %LOCAL_TIME%; -var LAST_TASK = null; +var LAST_SECTION = null; var LAST_EVENT = null; /* @@ -68,9 +68,11 @@ basic_row(ev) } /* - * If the task has changed, render a full-width blank row in the table: + * If the section has changed, render a full-width blank row in the table: */ - if (evr.task !== LAST_TASK) { + if (LAST_SECTION === null) { + LAST_SECTION = evr.section; + } else if (evr.section !== LAST_SECTION) { let count = 0; for (let i = 0; i < evr.fields.length; i++) { let f = evr.fields[i]; @@ -87,7 +89,7 @@ basic_row(ev) tr.appendChild(td); tbl.appendChild(td); - LAST_TASK = evr.task; + LAST_SECTION = evr.section; } let tr = document.createElement("tr"); diff --git a/variety/basic/www/style.css b/variety/basic/www/style.css index 469f841c..c64bd5f5 100644 --- a/variety/basic/www/style.css +++ b/variety/basic/www/style.css @@ -76,6 +76,18 @@ tr.s_default { background-color: #dddddd; } +tr.s_post { + background-color: #ffcceb; +} + +tr.s_post_stdout { + background-color: #fff0f9; +} + +tr.s_post_stderr { + background-color: #ffe5f5; +} + span.header { white-space: pre; font-family: monospace;
SEQ