Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@ root = true

[*]
max_line_length = 80

[*.js]
indent_style = tab
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ version = "0.0.0"
[workspace.lints.clippy]
identity_op = "allow"
many_single_char_names = "allow"
should_implement_trait = "allow"
too_many_arguments = "allow"
type_complexity = "allow"
vec_init_then_push = "allow"
Expand Down
103 changes: 81 additions & 22 deletions agent/src/control/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::{
};

use protocol::{
Decoder, FactoryInfo, Message, Payload, PayloadReq, PayloadRes,
Decoder, FactoryInfo, Message, Payload, PayloadReq, PayloadRes, Process,
};

pub(crate) mod protocol;
Expand Down Expand Up @@ -123,6 +123,7 @@ pub async fn main() -> Result<()> {
l.cmd("store", "access the job store", cmd!(cmd_store))?;
l.cmd("address", "manage IP addresses for this job", cmd!(cmd_address))?;
l.cmd("process", "manage background processes", cmd!(cmd_process))?;
l.cmd("post", "manage post tasks", cmd!(cmd_post))?;
l.cmd("factory", "factory information for this worker", cmd!(cmd_factory))?;
l.hcmd("eng", "for working on and testing buildomat", cmd!(cmd_eng))?;

Expand Down Expand Up @@ -374,6 +375,62 @@ async fn cmd_store_put(mut l: Level<Stuff>) -> Result<()> {
}
}

async fn cmd_post(mut l: Level<Stuff>) -> Result<()> {
l.context_mut().connect().await?;

l.cmd(
"success",
"add a post task running on success",
cmd!(cmd_post_success),
)?;
l.cmd(
"failure",
"add a post task running on failure",
cmd!(cmd_post_failure),
)?;
l.cmd("always", "add a post task that always runs", cmd!(cmd_post_always))?;

sel!(l).run().await
}

async fn cmd_post_success(l: Level<Stuff>) -> Result<()> {
post_inner(l, &[PayloadReq::PostSuccess]).await
}

async fn cmd_post_failure(l: Level<Stuff>) -> Result<()> {
post_inner(l, &[PayloadReq::PostFailure]).await
}

async fn cmd_post_always(l: Level<Stuff>) -> Result<()> {
post_inner(l, &[PayloadReq::PostSuccess, PayloadReq::PostFailure]).await
}

async fn post_inner(
mut l: Level<Stuff>,
payloads: &[fn(Process) -> PayloadReq],
) -> Result<()> {
l.usage_args(Some("NAME COMMAND [ARGS...]"));

let a = args!(l);

for payload in payloads {
let req = payload(build_process(&l, a.args())?);
match l.context_mut().req(req).await? {
PayloadRes::Error(e) => {
/*
* This request is purely local to the agent, so an
* error is not something we should retry indefinitely.
*/
bail!("could not enqueue post task {:?}: {e}", a.args()[0]);
}
PayloadRes::Ack => {}
other => bail!("unexpected response: {other:?}"),
}
}

Ok(())
}

async fn cmd_process(mut l: Level<Stuff>) -> Result<()> {
l.context_mut().connect().await?;

Expand All @@ -387,14 +444,29 @@ async fn cmd_process_start(mut l: Level<Stuff>) -> Result<()> {

let a = args!(l);

if a.args().len() < 2 {
let req = PayloadReq::ProcessStart(build_process(&l, a.args())?);
match l.context_mut().req(req).await? {
PayloadRes::Error(e) => {
/*
* This request is purely local to the agent, so an
* error is not something we should retry indefinitely.
*/
bail!("could not start process: {e}");
}
PayloadRes::Ack => Ok(()),
other => bail!("unexpected response: {other:?}"),
}
}

fn build_process(l: &Level<Stuff>, args: &[String]) -> Result<Process> {
if args.len() < 2 {
bad_args!(l, "specify at least a process name and a command to run");
}

let payload = PayloadReq::ProcessStart {
name: a.args()[0].to_string(),
cmd: a.args()[1].to_string(),
args: a.args().iter().skip(2).cloned().collect::<Vec<_>>(),
Ok(Process {
name: args[0].clone(),
cmd: args[1].clone(),
args: args[2..].to_vec(),

/*
* The process will actually be spawned by the agent, which is
Expand All @@ -403,24 +475,11 @@ async fn cmd_process_start(mut l: Level<Stuff>) -> Result<()> {
* process can be started as if it were run from the job program
* itself.
*/
env: std::env::vars_os().collect::<Vec<_>>(),
pwd: std::env::current_dir()?.to_str().unwrap().to_string(),

env: std::env::vars_os().collect(),
pwd: std::env::current_dir()?.into_os_string(),
uid: unsafe { libc::geteuid() },
gid: unsafe { libc::getegid() },
};

match l.context_mut().req(payload).await? {
PayloadRes::Error(e) => {
/*
* This request is purely local to the agent, so an
* error is not something we should retry indefinitely.
*/
bail!("could not start process: {e}");
}
PayloadRes::Ack => Ok(()),
other => bail!("unexpected response: {other:?}"),
}
})
}

async fn factory_info(s: &mut Stuff) -> Result<FactoryInfo> {
Expand Down
41 changes: 32 additions & 9 deletions agent/src/control/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,43 @@ pub struct StoreEntry {
pub secret: bool,
}

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct Process {
pub name: String,
pub cmd: String,
pub args: Vec<String>,
pub env: Vec<(OsString, OsString)>,
pub pwd: OsString,
pub uid: u32,
pub gid: u32,
}

impl Process {
pub fn validate(&self) -> Result<(), String> {
let name = &self.name;

if name.len() > 32 {
return Err(format!("process name {name:?} is longer than 32"));
}
if let Some(c) = name
.chars()
.find(|&c| !c.is_ascii_alphanumeric() && c != '-' && c != '_')
{
return Err(format!("invalid char {c:?} in process name {name:?}"));
}

Ok(())
}
}

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum PayloadReq {
StoreGet(String),
StorePut(String, String, bool),
MetadataAddresses,
ProcessStart {
name: String,
cmd: String,
args: Vec<String>,
env: Vec<(OsString, OsString)>,
pwd: String,
uid: u32,
gid: u32,
},
ProcessStart(Process),
PostSuccess(Process),
PostFailure(Process),
FactoryInfo,
}

Expand Down
Loading