diff --git a/Cargo.lock b/Cargo.lock index 567ea369..9c6730b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "adler2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" + [[package]] name = "ahash" version = "0.8.12" @@ -54,6 +60,18 @@ version = "1.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" +[[package]] +name = "async-compression" +version = "0.4.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0f9ee0f6e02ffd7ad5816e9464499fba7b3effd01123b515c41d1697c43dad1" +dependencies = [ + "compression-codecs", + "compression-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -707,6 +725,7 @@ version = "0.0.0" dependencies = [ "anyhow", "chrono", + "libc", "new_mime_guess", "rand 0.9.2", "regex", @@ -866,6 +885,37 @@ dependencies = [ "zone", ] +[[package]] +name = "buildomat-factory-user" +version = "0.0.0" +dependencies = [ + "anyhow", + "async-compression", + "base64 0.22.1", + "buildomat-client", + "buildomat-common", + "buildomat-database", + "buildomat-types", + "chrono", + "getopts", + "libc", + "rand 0.9.2", + "reqwest", + "rusty_ulid", + "schemars 0.8.22", + "sea-query", + "serde", + "serde_json", + "slog", + "slog-term", + "smf", + "strum", + "tempfile", + "tokio", + "toml 0.8.23", + "usdt", +] + [[package]] name = "buildomat-github-client" version = "0.0.0" @@ -1158,6 +1208,23 @@ dependencies = [ "cc", ] +[[package]] +name = "compression-codecs" +version = "0.4.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb7b51a7d9c967fc26773061ba86150f19c50c0d65c887cb1fbe295fd16619b7" +dependencies = [ + "compression-core", + "flate2", + "memchr", +] + +[[package]] +name = "compression-core" +version = "0.4.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75984efb6ed102a0d42db99afb6c1948f0380d1d91808d5529916e6c08b49d8d" + [[package]] name = "console" version = "0.16.3" @@ -1679,6 +1746,16 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" +[[package]] +name = "flate2" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "843fba2746e448b37e26a819579957415c8cef339bf08564fe8b7ddbd959573c" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "fnv" version = "1.0.7" @@ -2657,6 +2734,16 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" +[[package]] +name = "miniz_oxide" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" +dependencies = [ + "adler2", + "simd-adler32", +] + [[package]] name = "mio" version = "1.1.1" @@ -4014,6 +4101,12 @@ dependencies = [ "libc", ] +[[package]] +name = "simd-adler32" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "703d5c7ef118737c72f1af64ad2f6f8c5e1921f818cdcb97b8fe6fc69bf66214" + [[package]] name = "simple_asn1" version = "0.6.3" diff --git a/Cargo.toml b/Cargo.toml index 35bd8401..6e11c937 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "download", "factory/aws", "factory/gimlet", + "factory/user", "factory/lab", "factory/propolis", "github/client", @@ -45,6 +46,7 @@ wrong_self_convention = "allow" [workspace.dependencies] ansi-to-html = { version = "0.2", features = [ "lazy-init" ] } anyhow = "1" +async-compression = { version = "0.4", features = ["gzip", "tokio"] } aws-config = "1" aws-credential-types = "1" aws-runtime = "1" diff --git a/agent/src/main.rs b/agent/src/main.rs index d4e617f5..0e1db574 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -2,7 +2,7 @@ * Copyright 2024 Oxide Computer Company */ -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::env; use std::fs::{File, OpenOptions}; use std::io::{ErrorKind::NotFound, Write}; @@ -13,7 +13,7 @@ use std::process::Command; use std::sync::Arc; use std::time::Duration; -use anyhow::{bail, Result}; +use anyhow::{anyhow, bail, Result}; use chrono::prelude::*; use futures::StreamExt; use hiercmd::prelude::*; @@ -26,6 +26,7 @@ use tokio::sync::oneshot; use tokio::time::MissedTickBehavior; use buildomat_client::types::*; +use buildomat_common::unix::{Passwd, Uid}; use buildomat_common::*; use buildomat_types::*; @@ -42,31 +43,39 @@ struct Agent { log: Logger, } +const PATH: &[&str] = &[ + "/opt/buildomat/bin", + "/usr/bin", + "/bin", + "/usr/sbin", + "/sbin", + #[cfg(target_os = "illumos")] + "/opt/ooce/bin", + #[cfg(target_os = "illumos")] + "/opt/ooce/sbin", +]; + const CONFIG_PATH: &str = "/opt/buildomat/etc/agent.json"; const JOB_PATH: &str = "/opt/buildomat/etc/job.json"; const AGENT: &str = "/opt/buildomat/lib/agent"; const INPUT_PATH: &str = "/input"; const CONTROL_PROGRAM: &str = "bmat"; +const CONTROL_PROGRAM_PATH: &str = "/opt/buildomat/bin/bmat"; const SHADOW: &str = "/etc/shadow"; -#[cfg(target_os = "illumos")] -mod os_constants { - pub const METHOD: &str = "/opt/buildomat/lib/start.sh"; - pub const MANIFEST: &str = "/var/svc/manifest/site/buildomat-agent.xml"; - pub const INPUT_DATASET: &str = "rpool/input"; -} -#[cfg(target_os = "linux")] -mod os_constants { - pub const UNIT: &str = "/etc/systemd/system/buildomat-agent.service"; -} -use os_constants::*; +const ILLUMOS_METHOD: &str = "/opt/buildomat/lib/start.sh"; +const ILLUMOS_MANIFEST: &str = "/var/svc/manifest/site/buildomat-agent.xml"; +const ILLUMOS_INPUT_DATASET: &str = "rpool/input"; +const LINUX_UNIT: &str = "/etc/systemd/system/buildomat-agent.service"; use crate::control::protocol::StoreEntry; -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] struct ConfigFile { baseurl: String, bootstrap: String, token: String, + setuid: bool, + env: HashMap, } impl ConfigFile { @@ -88,7 +97,14 @@ impl ConfigFile { rx, )); - ClientWrap { log, client, job: None, tx: None, worker_tx } + ClientWrap { + log, + config: self.clone(), + client, + job: None, + tx: None, + worker_tx, + } } } @@ -274,6 +290,7 @@ async fn append_worker_worker( #[derive(Clone)] pub(crate) struct ClientWrap { log: Logger, + config: ConfigFile, client: buildomat_client::Client, job: Option>, tx: Option>, @@ -631,6 +648,19 @@ impl ClientWrap { let mut cmd = Command::new("/bin/bash"); cmd.arg(&s); + cmd.current_dir("/"); + + let passwd = if self.config.setuid { + /* + * Run the diagnostic script as root: + */ + Passwd::by_name("root")? + .ok_or_else(|| anyhow!("missing root user"))? + } else { + Passwd::current_user()? + }; + cmd.uid(passwd.uid.0); + cmd.gid(passwd.gid.0); /* * The diagnostic task should have a pristine and reproducible @@ -638,23 +668,24 @@ impl ClientWrap { */ cmd.env_clear(); - cmd.env("HOME", "/root"); - cmd.env("USER", "root"); - cmd.env("LOGNAME", "root"); + if let Some(dir) = &passwd.dir { + cmd.env("HOME", dir); + } + if let Some(name) = &passwd.name { + cmd.env("USER", name); + cmd.env("LOGNAME", name); + } cmd.env("TZ", "UTC"); - cmd.env( - "PATH", - "/usr/bin:/bin:/usr/sbin:/sbin:/opt/ooce/bin:/opt/ooce/sbin", - ); + cmd.env("PATH", std::env::join_paths(PATH)?); cmd.env("LANG", "en_US.UTF-8"); cmd.env("LC_ALL", "en_US.UTF-8"); /* - * Run the diagnostic script as root: + * Add variables configured at agent installation time. */ - cmd.current_dir("/"); - cmd.uid(0); - cmd.gid(0); + for (k, v) in &self.config.env { + cmd.env(k, v); + } match exec::run(cmd, ActivityBuilder::Diag(name.into())) { Ok(c) => Ok(c), @@ -1022,11 +1053,30 @@ enum Stage { async fn cmd_install(mut l: Level) -> Result<()> { l.usage_args(Some("BASEURL BOOTSTRAP_TOKEN")); l.optopt("N", "", "set nodename of machine", "NODENAME"); + l.optmulti("e", "", "add environment variables", "KEY=VALUE"); + l.optflag("", "no-service", "avoid creating the system service"); + l.optflag("", "no-setuid", "prevent the agent from changing users"); let a = args!(l); + let start_service = !a.opts().opt_present("no-service"); + let setuid = !a.opts().opt_present("no-setuid"); let log = l.context().log.clone(); - if a.args().len() < 2 { + /* + * Parse "-e KEY=VALUE" into keys and values. + */ + let env = a + .opts() + .opt_strs("e") + .into_iter() + .map(|v| { + v.split_once('=') + .map(|(k, v)| (k.to_string(), v.to_string())) + .ok_or_else(|| anyhow!("invalid flag: -e {v}")) + }) + .collect::, _>>()?; + + if a.args().len() != 2 { bad_args!(l, "specify base URL and bootstrap token value"); } @@ -1048,7 +1098,7 @@ async fn cmd_install(mut l: Level) -> Result<()> { */ make_dirs_for(CONFIG_PATH)?; rmfile(CONFIG_PATH)?; - let cf = ConfigFile { baseurl, bootstrap, token: genkey(64) }; + let cf = ConfigFile { baseurl, bootstrap, token: genkey(64), setuid, env }; store(CONFIG_PATH, &cf)?; /* @@ -1064,25 +1114,24 @@ async fn cmd_install(mut l: Level) -> Result<()> { * Install the agent binary with the control program name in a location in * the default PATH so that job programs can find it. */ - let cprog = format!("/usr/bin/{CONTROL_PROGRAM}"); - rmfile(&cprog)?; - std::fs::copy(&exe, &cprog)?; - make_executable(&cprog)?; + make_dirs_for(CONTROL_PROGRAM_PATH)?; + rmfile(CONTROL_PROGRAM_PATH)?; + std::fs::copy(&exe, CONTROL_PROGRAM_PATH)?; + make_executable(CONTROL_PROGRAM_PATH)?; - #[cfg(target_os = "illumos")] - { + if cfg!(target_os = "illumos") && start_service { /* * Copy SMF method script and manifest into place. */ let method = include_str!("../smf/start.sh"); - make_dirs_for(METHOD)?; - rmfile(METHOD)?; - write_text(METHOD, method)?; - make_executable(METHOD)?; + make_dirs_for(ILLUMOS_METHOD)?; + rmfile(ILLUMOS_METHOD)?; + write_text(ILLUMOS_METHOD, method)?; + make_executable(ILLUMOS_METHOD)?; let manifest = include_str!("../smf/agent.xml"); - rmfile(MANIFEST)?; - write_text(MANIFEST, manifest)?; + rmfile(ILLUMOS_MANIFEST)?; + write_text(ILLUMOS_MANIFEST, manifest)?; /* * Create the input directory. @@ -1091,7 +1140,7 @@ async fn cmd_install(mut l: Level) -> Result<()> { .arg("create") .arg("-o") .arg(format!("mountpoint={INPUT_PATH}")) - .arg(INPUT_DATASET) + .arg(ILLUMOS_INPUT_DATASET) .env_clear() .current_dir("/") .status(); @@ -1106,7 +1155,7 @@ async fn cmd_install(mut l: Level) -> Result<()> { */ let status = Command::new("/usr/sbin/svccfg") .arg("import") - .arg(MANIFEST) + .arg(ILLUMOS_MANIFEST) .env_clear() .current_dir("/") .status(); @@ -1115,10 +1164,7 @@ async fn cmd_install(mut l: Level) -> Result<()> { Ok(o) => bail!("svccfg import failure: {:?}", o), Err(e) => bail!("could not execute svccfg import: {:?}", e), } - } - - #[cfg(target_os = "linux")] - { + } else if cfg!(target_os = "linux") && start_service { /* * Create the input directory. */ @@ -1128,9 +1174,9 @@ async fn cmd_install(mut l: Level) -> Result<()> { * Write a systemd unit file for the agent service. */ let unit = include_str!("../systemd/agent.service"); - make_dirs_for(UNIT)?; - rmfile(UNIT)?; - write_text(UNIT, unit)?; + make_dirs_for(LINUX_UNIT)?; + rmfile(LINUX_UNIT)?; + write_text(LINUX_UNIT, unit)?; /* * Ask systemd to load the unit file we just wrote. @@ -1160,17 +1206,6 @@ async fn cmd_install(mut l: Level) -> Result<()> { Ok(o) => bail!("systemd start failure: {:?}", o), Err(e) => bail!("could not execute systemctl: {:?}", e), } - - /* - * Ubuntu 18.04 had a genuine pre-war separate /bin directory! - */ - let binmd = std::fs::symlink_metadata("/bin")?; - if binmd.is_dir() { - std::os::unix::fs::symlink( - format!("../usr/bin/{CONTROL_PROGRAM}"), - format!("/bin/{CONTROL_PROGRAM}"), - )?; - } } Ok(()) @@ -1590,6 +1625,25 @@ async fn cmd_run(mut l: Level) -> Result<()> { let mut cmd = Command::new("/bin/bash"); cmd.arg(&s); + /* + * Each task may be expected to run under a different user + * account or with a different working directory. + */ + cmd.current_dir(&t.workdir); + let passwd = if cf.setuid { + cmd.uid(t.uid); + cmd.gid(t.gid); + Passwd::by_uid(Uid(t.uid))?.ok_or_else(|| { + anyhow!("no user on the system with uid {}", t.uid) + })? + } else { + /* + * XXX should we handle when a different uid/gid is + * requested? + */ + Passwd::current_user()? + }; + /* * The user task should have a pristine and reproducible * environment that does not leak in artefacts of the @@ -1600,21 +1654,27 @@ async fn cmd_run(mut l: Level) -> Result<()> { /* * Absent a request for an entirely clean slate, we * set a few specific environment variables. - * XXX HOME/USER/LOGNAME should probably respect "uid" */ - cmd.env("HOME", "/root"); - cmd.env("USER", "root"); - cmd.env("LOGNAME", "root"); + if let Some(name) = &passwd.name { + cmd.env("USER", name); + cmd.env("LOGNAME", name); + } + if let Some(dir) = &passwd.dir { + cmd.env("HOME", dir); + } cmd.env("TZ", "UTC"); - cmd.env( - "PATH", - "/usr/bin:/bin:/usr/sbin:/sbin:\ - /opt/ooce/bin:/opt/ooce/sbin", - ); + cmd.env("PATH", std::env::join_paths(PATH)?); cmd.env("LANG", "en_US.UTF-8"); cmd.env("LC_ALL", "en_US.UTF-8"); cmd.env("BUILDOMAT_JOB_ID", cw.job_id().unwrap()); cmd.env("BUILDOMAT_TASK_ID", t.id.to_string()); + + /* + * Add variables configured at agent installation time. + */ + for (k, v) in &cf.env { + cmd.env(k, v); + } } for (k, v) in t.env.iter() { /* @@ -1625,14 +1685,6 @@ async fn cmd_run(mut l: Level) -> Result<()> { cmd.env(k, v); } - /* - * Each task may be expected to run under a different user - * account or with a different working directory. - */ - cmd.current_dir(&t.workdir); - cmd.uid(t.uid); - cmd.gid(t.gid); - match exec::run(cmd, ActivityBuilder::Task) { Ok(c) => { stage = Stage::Child(c, t, None); diff --git a/client/openapi.json b/client/openapi.json index e924e604..5fd823c5 100644 --- a/client/openapi.json +++ b/client/openapi.json @@ -922,6 +922,42 @@ } } }, + "/0/factory/worker/{worker}/fail": { + "post": { + "operationId": "factory_worker_fail", + "parameters": [ + { + "in": "path", + "name": "worker", + "required": true, + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/FactoryWorkerFail" + } + } + }, + "required": true + }, + "responses": { + "204": { + "description": "resource updated" + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/0/factory/worker/{worker}/flush": { "post": { "operationId": "factory_worker_flush", @@ -2943,6 +2979,17 @@ "target" ] }, + "FactoryWorkerFail": { + "type": "object", + "properties": { + "reason": { + "type": "string" + } + }, + "required": [ + "reason" + ] + }, "FactoryWorkerResult": { "type": "object", "properties": { diff --git a/common/Cargo.toml b/common/Cargo.toml index 26497bfa..4d0700ec 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -13,6 +13,7 @@ doctest = false [dependencies] anyhow = { workspace = true } chrono = { workspace = true } +libc = { workspace = true } new_mime_guess = { workspace = true } rand = { workspace = true } regex = { workspace = true } diff --git a/common/src/lib.rs b/common/src/lib.rs index dda5f806..3f64dcc7 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -4,6 +4,8 @@ mod job_streams; +pub mod unix; + use std::io::{IsTerminal, Read}; use std::path::Path; use std::sync::{Mutex, OnceLock}; diff --git a/common/src/unix.rs b/common/src/unix.rs new file mode 100644 index 00000000..2d1d31ab --- /dev/null +++ b/common/src/unix.rs @@ -0,0 +1,105 @@ +/* + * Copyright 2026 Oxide Computer Company + */ + +use anyhow::{anyhow, Result}; +use serde::{Deserialize, Serialize}; +use std::ffi::{CStr, CString}; +use std::io::{Error as IoError, ErrorKind}; +use std::path::PathBuf; + +pub struct Passwd { + pub uid: Uid, + pub gid: Gid, + pub name: Option, + pub dir: Option, +} + +impl Passwd { + pub fn current_user() -> Result { + Self::by_uid(getuid())? + .ok_or_else(|| anyhow!("missing passwd entry for the current user")) + } + + pub fn by_name(name: &str) -> Result> { + let name = CString::new(name.to_string())?; + Self::from_libc(catch_errno(|| unsafe { + libc::getpwnam(name.as_ptr()) + })?) + } + + pub fn by_uid(uid: Uid) -> Result> { + Self::from_libc(catch_errno(|| unsafe { libc::getpwuid(uid.0) })?) + } + + fn from_libc(ptr: *const libc::passwd) -> Result> { + let passwd = match unsafe { ptr.as_ref() } { + Some(p) => p, + None => return Ok(None), + }; + + Ok(Some(Passwd { + uid: Uid(passwd.pw_uid), + gid: Gid(passwd.pw_gid), + name: if passwd.pw_name.is_null() { + None + } else { + let cstr = unsafe { CStr::from_ptr(passwd.pw_name) }; + Some(cstr.to_str()?.into()) + }, + dir: if passwd.pw_dir.is_null() { + None + } else { + let cstr = unsafe { CStr::from_ptr(passwd.pw_dir) }; + Some(cstr.to_str()?.into()) + }, + })) + } +} + +pub fn getuid() -> Uid { + Uid(unsafe { libc::getuid() }) +} + +#[derive( + Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, +)] +#[serde(transparent)] +pub struct Uid(pub u32); + +#[derive( + Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, +)] +#[serde(transparent)] +pub struct Gid(pub u32); + +fn catch_errno T>(f: F) -> Result { + loop { + clear_errno(); + let result = f(); + let err = IoError::last_os_error(); + if err.raw_os_error() == Some(0) { + return Ok(result); + } else if let ErrorKind::Interrupted = err.kind() { + continue; + } else { + return Err(err); + } + } +} + +#[cfg(target_os = "illumos")] +fn clear_errno() { + unsafe { + let errno = libc::___errno(); + *errno = 0; + } +} + +#[cfg(target_os = "linux")] +fn clear_errno() { + unsafe { + let errno = libc::__errno_location(); + *errno = 0; + } +} diff --git a/factory/user/Cargo.toml b/factory/user/Cargo.toml new file mode 100644 index 00000000..bfa56232 --- /dev/null +++ b/factory/user/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "buildomat-factory-user" +version.workspace = true +edition.workspace = true +license.workspace = true + +[lints] +workspace = true + +[dependencies] +buildomat-common = { path = "../../common" } +buildomat-database = { path = "../../database" } +buildomat-client = { path = "../../client" } +buildomat-types = { path = "../../types" } + +anyhow = { workspace = true } +async-compression = { workspace = true } +base64 = { workspace = true } +chrono = { workspace = true } +getopts = { workspace = true } +libc = { workspace = true } +rand = { workspace = true } +reqwest = { workspace = true } +rusty_ulid = { workspace = true } +schemars = { workspace = true } +sea-query = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +slog = { workspace = true } +slog-term = { workspace = true } +smf = { workspace = true } +strum = { workspace = true } +tempfile = { workspace = true } +tokio = { workspace = true } +toml = { workspace = true } +usdt = { workspace = true } diff --git a/factory/user/README.md b/factory/user/README.md new file mode 100644 index 00000000..c188ef4d --- /dev/null +++ b/factory/user/README.md @@ -0,0 +1,62 @@ +# User Factory + +The user factory creates unprivileged users on a shared illumos machine to run +jobs. The factory ensures each job runs in an ephemeral system user with no +privileges and no ability to write files outside of allowlisted directories, and +cleans up any leftover files at the end of the job. + +It currently only supports running on illumos, and requires that the factory +runs with the `Primary Administrator` profile. + +## Slots + +The factory configuration has to define one or more named "slots". Each slot +has a buildomat target it supports, plus optional configuration to define its +execution environment. When the a job comes in from the buildomat core server, +the factory will only accept it if there is a free slot for that job's target. +This means the number of slots is also the concurrency limit on that system. + +The minimal slot configuration requires only a slot name and the ID of the +buildomat target the slot supports: + +```toml +[slots.NAME] +target = "TARGET_ID" +``` + +Additional configuration options are available: + +* **`add_to_groups = ["GROUP", "GROUP"]`**: add the ephemeral system user to + these groups in addition to the primary (ephemeral) group. This can be useful + to allow jobs running in the slot to access system capabilities guarded by + group membership. + +* **`env.KEY = "VALUE"`**: set arbitrary environment variables in jobs executed + inside of this slot. + +An example of a full slot configuration: + +```toml +[slots.hubris-1] +target = "..." +env.HUMILITY_ENVIRONMENT = "/opt/ci/humility/1.json" +add_to_groups = ["usb"] +``` + +## Isolation + +Each job runs as an ephemeral user, with randomly generated UID and GID, +guaranteed not to collide with any UID and GID currently in use on the system. +The name of the user is the worker ID prefixed by `bmat-`. + +The whole filesystem is enforced to be read-only, except for a few allowlisted +directories. All writeable directories are empty at the time the job starts, and +are purged at the end of the job. The following directiories are writeable: + +* `/home/$USER` +* `/input` +* `/opt/buildomat` (used by the buildomat agent) +* `/tmp` +* `/var/run` (used by the buildomat agent) +* `/var/tmp` +* `/work` diff --git a/factory/user/schema.sql b/factory/user/schema.sql new file mode 100644 index 00000000..6322c69e --- /dev/null +++ b/factory/user/schema.sql @@ -0,0 +1,11 @@ +-- v 1 +CREATE TABLE worker ( + id TEXT NOT NULL PRIMARY KEY, + slot TEXT NOT NULL, + state TEXT NOT NULL, + leased_job TEXT NOT NULL, + bootstrap TEXT NOT NULL +); + +-- v 2 +CREATE INDEX worker_state ON worker (state); diff --git a/factory/user/smf/worker.xml b/factory/user/smf/worker.xml new file mode 100644 index 00000000..baa0bc6e --- /dev/null +++ b/factory/user/smf/worker.xml @@ -0,0 +1,8 @@ + + + + + + + + diff --git a/factory/user/src/bootstrap_agent.rs b/factory/user/src/bootstrap_agent.rs new file mode 100644 index 00000000..9a527337 --- /dev/null +++ b/factory/user/src/bootstrap_agent.rs @@ -0,0 +1,199 @@ +/* + * Copyright 2026 Oxide Computer Company + */ + +use anyhow::{bail, Context as _, Result}; +use async_compression::tokio::write::GzipDecoder; +use base64::prelude::{Engine, BASE64_STANDARD}; +use buildomat_client::prelude::StreamExt as _; +use buildomat_common::unix::{Gid, Uid}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::os::unix::fs::{chroot, PermissionsExt}; +use std::os::unix::process::CommandExt; +use std::path::{Path, PathBuf}; +use std::process::{Command, Stdio}; +use std::time::Duration; +use tempfile::{NamedTempFile, TempPath}; +use tokio::fs::File; +use tokio::io::BufWriter; + +pub(crate) async fn run() -> Result<()> { + let ctx = BootstrapContext::from_svcprop()?; + + /* + * We want to run the agent inside of a chroot where most of the filesystem + * is read-only and we have isolated writeable directories. The factory + * prepares the chroot for us, we just need to get into it. + */ + chroot(&ctx.chroot) + .with_context(|| format!("failed to chroot in {:?}", ctx.chroot))?; + + let agent = download_agent(&ctx).await?; + mark_executable(&agent).await?; + + loop { + let mut cmd = Command::new(&agent); + cmd.arg("install"); + for (k, v) in &ctx.env { + cmd.arg("-e").arg(format!("{k}={v}")); + } + cmd.arg("--no-service"); + cmd.arg("--no-setuid"); + cmd.arg(&ctx.baseurl).arg(&ctx.bootstrap_token); + cmd.uid(ctx.uid.0).gid(ctx.gid.0); + + let s = cmd.status().context("failed to start agent installation")?; + if s.success() { + break; + } else { + eprintln!("failed to install the agent, exited with {s}"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + + let mut cmd = Command::new("/opt/buildomat/lib/agent"); + cmd.arg("run"); + cmd.uid(ctx.uid.0).gid(ctx.gid.0); + cmd.spawn().context("failed to spawn the agent")?; + + Ok(()) +} + +async fn download_agent(ctx: &BootstrapContext) -> Result { + let baseurl = &ctx.baseurl; + let query = agent_query_string()?; + loop { + println!("attempting to download the gz agent..."); + match download_gz(format!("{baseurl}/file/agent.gz?{query}")).await { + Ok(dest) => return Ok(dest), + Err(e) => { + eprintln!("failed to download gz agent: {e}"); + } + } + + println!("attempting to download the plaintext agent..."); + match download_plain(format!("{baseurl}/file/agent?{query}")).await { + Ok(dest) => return Ok(dest), + Err(e) => { + eprintln!("failed to download plaintext agent: {e}"); + } + } + + println!("downloads failed, retrying in a second"); + tokio::time::sleep(Duration::from_secs(1)).await; + } +} + +async fn download_gz(url: String) -> Result { + let (dest, dest_path) = NamedTempFile::new()?.into_parts(); + + let mut resp = reqwest::get(&url).await?.error_for_status()?.bytes_stream(); + let mut dest = GzipDecoder::new(BufWriter::new(File::from_std(dest))); + + while let Some(chunk) = resp.next().await { + tokio::io::copy(&mut chunk?.as_ref(), &mut dest).await.with_context( + || format!("failed to download {url:?} to {dest_path:?}"), + )?; + } + + Ok(dest_path) +} + +async fn download_plain(url: String) -> Result { + let (dest, dest_path) = NamedTempFile::new()?.into_parts(); + + let mut resp = reqwest::get(&url).await?.error_for_status()?.bytes_stream(); + let mut dest = BufWriter::new(File::from_std(dest)); + + while let Some(chunk) = resp.next().await { + tokio::io::copy(&mut chunk?.as_ref(), &mut dest).await.with_context( + || format!("failed to download {url:?} to {dest_path:?}"), + )?; + } + + Ok(dest_path) +} + +async fn mark_executable(path: &Path) -> Result<()> { + let mut perms = tokio::fs::metadata(path).await?.permissions(); + perms.set_mode(0o755); + tokio::fs::set_permissions(path, perms).await?; + Ok(()) +} + +/* + * Give the server some hints as to what OS we're running so that it can give us + * the most appropriate agent binary. + */ +fn agent_query_string() -> Result { + let os_release = std::fs::read_to_string("/etc/os-release") + .context("failed to read /etc/os-release")?; + let mut os_release = os_release + .split('\n') + .map(|line| line.trim()) + .filter(|line| !line.is_empty()) + .flat_map(|line| line.split_once('=')) + .collect::>(); + + let mut query = Vec::new(); + query.push(format!("kernel={}", command_output("uname", &["-r"])?)); + query.push(format!("proc={}", command_output("uname", &["-p"])?)); + query.push(format!("mach={}", command_output("uname", &["-m"])?)); + query.push(format!("plat={}", command_output("uname", &["-i"])?)); + query.push(format!("id={}", os_release.remove("ID").unwrap_or_default())); + query.push(format!( + "id_like={}", + os_release.remove("ID_LIKE").unwrap_or_default() + )); + query.push(format!( + "version_id={}", + os_release.remove("VERSION_ID").unwrap_or_default() + )); + Ok(query.join("&")) +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct BootstrapContext { + pub(crate) baseurl: String, + pub(crate) bootstrap_token: String, + pub(crate) chroot: PathBuf, + pub(crate) uid: Uid, + pub(crate) gid: Gid, + pub(crate) env: HashMap, +} + +impl BootstrapContext { + fn from_svcprop() -> Result { + let fmri = std::env::var("SMF_FMRI") + .context("failed to get SMF_FMRI environment variable")?; + let prop = command_output("svcprop", &["-p", "buildomat/ctx", &fmri]) + .context("failed to get buildomat/ctx SMF property")?; + + /* + * The "svcprop" CLI escapes quotes when printing the property, which + * results in invalid JSON. Encoding the property as base64 avoids it. + */ + let decoded = BASE64_STANDARD.decode(&prop).with_context(|| { + format!("failed to decode buildomat/ctx SMF property: {prop}") + })?; + serde_json::from_slice(&decoded).with_context(|| { + format!("failed to deserialize buildomat/ctx SMF property: {prop}") + }) + } +} + +fn command_output(cmd: &str, args: &[&str]) -> Result { + let output = Command::new(cmd) + .args(args) + .stdout(Stdio::piped()) + .spawn()? + .wait_with_output()?; + if !output.status.success() { + bail!("{cmd} failed with {}", output.status); + } + Ok(String::from_utf8(output.stdout) + .context("non-UTF-8 output")? + .trim_end_matches(['\n', '\r']) + .into()) +} diff --git a/factory/user/src/config.rs b/factory/user/src/config.rs new file mode 100644 index 00000000..1e0f5afd --- /dev/null +++ b/factory/user/src/config.rs @@ -0,0 +1,44 @@ +/* + * Copyright 2026 Oxide Computer Company + */ + +use serde::Deserialize; +use std::collections::HashMap; + +#[derive(Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] +pub(crate) struct ConfigFile { + pub(crate) general: ConfigFileGeneral, + pub(crate) factory: ConfigFileFactory, + #[serde(default)] + pub(crate) illumos: ConfigFileIllumos, + pub(crate) slots: HashMap, +} + +#[derive(Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] +pub(crate) struct ConfigFileGeneral { + pub(crate) baseurl: String, +} + +#[derive(Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] +pub(crate) struct ConfigFileFactory { + pub(crate) token: String, +} + +#[derive(Deserialize, Debug, Clone, Default)] +#[serde(deny_unknown_fields)] +pub(crate) struct ConfigFileIllumos { + pub(crate) parent_zfs_dataset: Option, +} + +#[derive(Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] +pub(crate) struct ConfigFileSlot { + pub(crate) target: String, + #[serde(default)] + pub(crate) add_to_groups: Vec, + #[serde(default)] + pub(crate) env: HashMap, +} diff --git a/factory/user/src/coordinator.rs b/factory/user/src/coordinator.rs new file mode 100644 index 00000000..fd1e76b8 --- /dev/null +++ b/factory/user/src/coordinator.rs @@ -0,0 +1,120 @@ +/* + * Copyright 2026 Oxide Computer Company + */ + +use crate::db::types::*; +use crate::Central; +use anyhow::Result; +use slog::{error, info, o, Logger}; +use std::collections::{HashMap, HashSet}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tokio::task::JoinHandle; + +pub(crate) async fn coordinator_loop(c: Arc) { + let log = c.log.new(o!("component" => "coordinator")); + + let mut tasks = HashMap::new(); + loop { + if let Err(e) = coordinator_iteration(&log, &c, &mut tasks).await { + error!(log, "coordinator error: {e:?}"); + } + tokio::time::sleep(Duration::from_secs(1)).await; + } +} + +async fn coordinator_iteration( + log: &Logger, + c: &Arc, + tasks: &mut HashMap, +) -> Result<()> { + let mut to_spawn = Vec::new(); + let mut to_stop = Vec::new(); + + /* + * Tasks that manage a worker tracked in the database. + */ + let workers = c.db.workers()?; + for worker in &workers { + if let WorkerState::Destroyed = worker.state { + to_stop.push(worker.id); + } + if let Some(task) = tasks.get(&worker.id) { + if task.join_handle.is_finished() { + /* + * If the task exited but the worker is not marked as destroyed, + * start the task again. This can only happen with panics. + */ + to_spawn.push(worker.id); + } + } else { + to_spawn.push(worker.id); + } + } + + /* + * Tasks that manage a worker *not* tracked in the database. + */ + let worker_ids = workers.iter().map(|w| w.id).collect::>(); + for id in tasks.keys() { + if !worker_ids.contains(id) { + to_stop.push(*id); + } + } + + for worker_id in to_stop { + let Some(task) = tasks.get(&worker_id) else { continue }; + + if task.join_handle.is_finished() { + tasks.remove(&worker_id); + } else if !task.stop.swap(true, Ordering::Relaxed) { + info!(log, "stopping task for worker {worker_id}"); + } + } + + for worker_id in to_spawn { + let stop = Arc::new(AtomicBool::new(false)); + + let join_handle = tokio::spawn(worker_loop( + Arc::clone(c), + worker_id, + Arc::clone(&stop), + )); + + info!(log, "spawned task for worker {worker_id}"); + tasks.insert(worker_id, Task { stop, join_handle }); + } + + Ok(()) +} + +async fn worker_loop(c: Arc, id: WorkerId, stop: Arc) { + let log = c.log.new(o!( + "component" => "worker", + "worker" => id.to_string(), + )); + + while !stop.load(Ordering::Relaxed) { + match crate::illumos::worker_iteration(&log, &c, id).await { + Ok(DoNext::Immediate) => {} + Ok(DoNext::Sleep) => { + tokio::time::sleep(Duration::from_secs(1)).await; + } + Err(err) => { + error!(log, "worker {id} iteration failed: {err}"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } +} + +pub(crate) enum DoNext { + Sleep, + Immediate, +} + +struct Task { + join_handle: JoinHandle<()>, + stop: Arc, +} diff --git a/factory/user/src/db/mod.rs b/factory/user/src/db/mod.rs new file mode 100644 index 00000000..f8df2246 --- /dev/null +++ b/factory/user/src/db/mod.rs @@ -0,0 +1,132 @@ +/* + * Copyright 2026 Oxide Computer Company + */ + +use crate::db::types::WorkerId; +use anyhow::Result; +use buildomat_database::{conflict, DBResult, FromRow, Sqlite}; +use sea_query::{Expr, Order, Query}; +use slog::Logger; +use std::path::Path; + +mod tables; + +pub mod types { + use buildomat_database::{ + rusqlite, sqlite_integer_new_type, sqlite_ulid_new_type, + }; + + sqlite_ulid_new_type!(WorkerId); + sqlite_ulid_new_type!(JobId); + sqlite_integer_new_type!(InstanceSeq, u64, BigUnsigned); + + pub use super::tables::WorkerState; +} + +pub use tables::*; + +pub struct Database { + sql: Sqlite, +} + +impl Database { + pub fn new>( + log: Logger, + path: P, + cache_kb: Option, + ) -> Result { + let sql = Sqlite::setup( + log, + path, + include_str!("../../schema.sql"), + cache_kb, + )?; + + Ok(Database { sql }) + } + + pub fn workers(&self) -> DBResult> { + self.sql.tx(|tx| { + tx.get_rows( + Query::select() + .from(WorkerDef::Table) + .columns(Worker::columns()) + .order_by(WorkerDef::Id, Order::Asc) + .to_owned(), + ) + }) + } + + pub fn worker_get(&self, id: WorkerId) -> DBResult> { + self.sql.tx(|tx| tx.get_row_opt(Worker::find(id))) + } + + pub fn worker_create(&self, create: Worker) -> DBResult { + self.sql.tx_immediate(|h| { + let count = h.exec_insert(create.insert())?; + assert_eq!(count, 1); + Ok(create.id) + }) + } + + pub fn worker_delete(&self, id: WorkerId) -> DBResult<()> { + self.sql.tx_immediate(|h| { + let count = h.exec_delete( + Query::delete() + .from_table(WorkerDef::Table) + .and_where(Expr::col(WorkerDef::Id).eq(id)) + .to_owned(), + )?; + assert_eq!(count, 1); + Ok(()) + }) + } + + pub fn worker_new_state( + &self, + id: WorkerId, + state: WorkerState, + ) -> DBResult<()> { + self.sql.tx_immediate(|tx| { + /* + * Get the existing state of this worker: + */ + let worker: Worker = tx.get_row(Worker::find(id))?; + + if worker.state == state { + return Ok(()); + } + + let valid_source_states: &[WorkerState] = match state { + WorkerState::Unconfigured => &[], + WorkerState::Configured => &[WorkerState::Unconfigured], + WorkerState::Broken => { + &[WorkerState::Unconfigured, WorkerState::Configured] + } + WorkerState::Destroying => &[ + WorkerState::Broken, + WorkerState::Unconfigured, + WorkerState::Configured, + ], + WorkerState::Destroyed => &[WorkerState::Destroying], + }; + + if !valid_source_states.contains(&worker.state) { + conflict!( + "worker {id} cannot move from state {} to {state}", + worker.state, + ); + } + + let count = tx.exec_update( + Query::update() + .table(WorkerDef::Table) + .and_where(Expr::col(WorkerDef::Id).eq(id)) + .value(WorkerDef::State, state) + .to_owned(), + )?; + assert_eq!(count, 1); + Ok(()) + }) + } +} diff --git a/factory/user/src/db/tables/mod.rs b/factory/user/src/db/tables/mod.rs new file mode 100644 index 00000000..ee1dedcb --- /dev/null +++ b/factory/user/src/db/tables/mod.rs @@ -0,0 +1,19 @@ +/* + * Copyright 2026 Oxide Computer Company + */ + +mod sublude { + pub use std::str::FromStr; + + pub use crate::db::types::*; + pub use buildomat_database::{rusqlite, sqlite_sql_enum, FromRow}; + pub use rusqlite::Row; + pub use sea_query::{ + enum_def, ColumnRef, Expr, Iden, InsertStatement, Query, SeaRc, + SelectStatement, + }; +} + +mod worker; + +pub use worker::*; diff --git a/factory/user/src/db/tables/worker.rs b/factory/user/src/db/tables/worker.rs new file mode 100644 index 00000000..acedaf34 --- /dev/null +++ b/factory/user/src/db/tables/worker.rs @@ -0,0 +1,77 @@ +/* + * Copyright 2026 Oxide Computer Company + */ + +use super::sublude::*; + +sqlite_sql_enum!(WorkerState => { + Unconfigured, + Configured, + Broken, + Destroying, + Destroyed, +}); + +#[derive(Clone, Debug)] +#[enum_def(prefix = "", suffix = "Def")] +pub(crate) struct Worker { + pub(crate) id: WorkerId, + pub(crate) slot: String, + pub(crate) state: WorkerState, + pub(crate) leased_job: JobId, + pub(crate) bootstrap: String, +} + +impl FromRow for Worker { + fn columns() -> Vec { + [ + WorkerDef::Id, + WorkerDef::Slot, + WorkerDef::State, + WorkerDef::LeasedJob, + WorkerDef::Bootstrap, + ] + .into_iter() + .map(|col| { + ColumnRef::TableColumn( + SeaRc::new(WorkerDef::Table), + SeaRc::new(col), + ) + }) + .collect() + } + + fn from_row(row: &Row) -> rusqlite::Result { + Ok(Worker { + id: row.get(0)?, + slot: row.get(1)?, + state: row.get(2)?, + leased_job: row.get(3)?, + bootstrap: row.get(4)?, + }) + } +} + +impl Worker { + pub(crate) fn find(id: WorkerId) -> SelectStatement { + Query::select() + .from(WorkerDef::Table) + .columns(Worker::columns()) + .and_where(Expr::col(WorkerDef::Id).eq(id)) + .to_owned() + } + + pub(crate) fn insert(&self) -> InsertStatement { + Query::insert() + .into_table(WorkerDef::Table) + .columns(Self::bare_columns()) + .values_panic([ + self.id.into(), + self.slot.clone().into(), + self.state.into(), + self.leased_job.into(), + self.bootstrap.clone().into(), + ]) + .to_owned() + } +} diff --git a/factory/user/src/factory.rs b/factory/user/src/factory.rs new file mode 100644 index 00000000..e6fd9a51 --- /dev/null +++ b/factory/user/src/factory.rs @@ -0,0 +1,239 @@ +/* + * Copyright 2026 Oxide Computer Company + */ + +use crate::db::{types::*, Worker}; +use crate::Central; +use anyhow::Result; +use slog::{debug, error, info, o, warn, Logger}; +use std::collections::HashSet; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; + +async fn reconcile_with_server(log: &Logger, c: &Arc) -> Result<()> { + /* + * For each worker we are tracking in the local database, check to see if + * its server record still exists. If it does not, destroy the worker. + */ + for worker in c.db.workers()? { + match worker.state { + WorkerState::Unconfigured + | WorkerState::Configured + | WorkerState::Broken => {} + WorkerState::Destroying | WorkerState::Destroyed => { + /* + * We don't need to keep looking at workers that are currently + * being destroyed. + */ + continue; + } + } + + let id = worker.id; + let leased_job = worker.leased_job; + let server_worker = c + .client + .factory_worker_get() + .worker(id.to_string()) + .send() + .await? + .into_inner() + .worker; + + if let Some(server_worker) = server_worker { + if server_worker.recycle { + /* + * If the worker has been deleted through the administrative API + * then we need to tear it down straight away. + */ + info!(log, "worker {id} recycled, destroying it"); + c.db.worker_new_state(id, WorkerState::Destroying)?; + } else if !matches!(worker.state, WorkerState::Broken) + && !server_worker.online + { + /* + * If the worker has not yet bootstrapped try to renew the + * lease with the server. This should prevent duplicate + * instance creation when creation or bootstrap is taking + * longer than expected. + */ + debug!( + log, + "worker {id} has not bootstrapped yet, \ + renewing lease for job {leased_job}" + ); + c.client + .factory_lease_renew() + .job(leased_job.to_string()) + .send() + .await?; + } + } else { + warn!(log, "worker {id} missing from the server, destroying it"); + c.db.worker_new_state(id, WorkerState::Destroying)?; + }; + } + + /* + * At this point we have examined all of the workers which exist in the + * local database. If there are any worker records on the server left that + * do not have an associated record in the factory, they must be scrubbed + * from the server as detritus from prior failed runs. + */ + for worker in c.client.factory_workers().send().await?.into_inner() { + let id = WorkerId::from_str(&worker.id)?; + + /* + * There is a record of a particular instance ID for this worker. + * Check to see if that instance exists. + */ + if let Some(local) = c.db.worker_get(id)? { + if matches!(local.state, WorkerState::Destroyed) { + /* + * The worker exists, but is terminated. Delete the + * worker on the server too. + */ + info!(log, "deleting terminated worker {id} from the server"); + c.client + .factory_worker_destroy() + .worker(&worker.id) + .send() + .await?; + c.db.worker_delete(id)?; + } else { + if worker.private.is_none() { + /* + * Starting the worker might've failed before we could + * associate the slot with it. Rectify that now. + */ + warn!( + log, + "worker {id} doesn't have a slot associated on the \ + server, associating it" + ); + c.client + .factory_worker_associate() + .worker(&worker.id) + .body_map(|b| b.private(&local.slot)) + .send() + .await?; + } + } + } else { + /* + * The worker does not exist in the local database. + */ + warn!(log, "clearing unknown worker {id} from the server"); + c.client.factory_worker_destroy().worker(&worker.id).send().await?; + } + } + + Ok(()) +} + +async fn start_worker(log: &Logger, c: &Arc) -> Result { + let used_slots = + c.db.workers()? + .into_iter() + .filter(|w| !matches!(w.state, WorkerState::Destroyed)) + .map(|w| w.slot) + .collect::>(); + let available_targets = c + .config + .slots + .iter() + .filter(|(name, _)| !used_slots.contains(name.as_str())) + .map(|(_, slot)| slot.target.clone()) + /* + * Deduplicate the targets by first collecting into a HashSet. + */ + .collect::>() + .into_iter() + .collect::>(); + debug!(log, "slots currently in use: {used_slots:?}"); + + /* + * Check to see if the server requires any new workers. + */ + let lease = c + .client + .factory_lease() + .body_map(|b| b.supported_targets(available_targets)) + .send() + .await? + .into_inner(); + let Some(lease) = lease.lease else { + return Ok(false); + }; + + /* + * Locate a slot for the target the server asked for. + */ + let Some(slot) = c + .config + .slots + .iter() + .filter(|(name, _)| !used_slots.contains(name.as_str())) + .filter(|(_, slot)| slot.target == lease.target) + .map(|(name, _)| name.clone()) + .next() + else { + warn!( + log, + "server wants target {:?}, but no slots for it are available", + lease.target + ); + return Ok(false); + }; + + /* + * Create the worker on the server and save it in the database. There could + * be a failure between the two parts, but reconciliation will handle it (by + * deleting the worker from the server). + */ + let worker = c + .client + .factory_worker_create() + .body_map(|b| b.target(&lease.target).wait_for_flush(false)) + .send() + .await?; + c.db.worker_create(Worker { + id: WorkerId::from_str(&worker.id)?, + slot: slot.clone(), + state: WorkerState::Unconfigured, + leased_job: JobId::from_str(&lease.job)?, + bootstrap: worker.bootstrap.to_string(), + })?; + info!(log, "created worker {}", worker.id); + + /* + * Record the slot we used in the server. If it fails, reconciliation will + * take care of updating the value on the server. + */ + c.client + .factory_worker_associate() + .worker(&worker.id) + .body_map(|b| b.private(slot)) + .send() + .await?; + + Ok(true) +} + +async fn factory_iteration(log: &Logger, c: &Arc) -> Result<()> { + reconcile_with_server(log, c).await?; + while start_worker(log, c).await? {} + Ok(()) +} + +pub(crate) async fn factory_loop(c: Arc) { + let log = c.log.new(o!("component" => "factory")); + + loop { + if let Err(e) = factory_iteration(&log, &c).await { + error!(log, "factory task error: {:?}", e); + } + tokio::time::sleep(Duration::from_secs(1)).await; + } +} diff --git a/factory/user/src/illumos/chroot.rs b/factory/user/src/illumos/chroot.rs new file mode 100644 index 00000000..dab028c1 --- /dev/null +++ b/factory/user/src/illumos/chroot.rs @@ -0,0 +1,207 @@ +/* + * Copyright 2026 Oxide Computer Company + */ + +use crate::db::types::WorkerId; +use crate::Central; +use anyhow::{anyhow, bail, Context as _, Result}; +use buildomat_common::unix::{Passwd, Uid}; +use buildomat_common::OutputExt as _; +use std::iter::once; +use std::os::unix::fs::chown; +use std::path::{Path, PathBuf}; +use std::process::Command; + +const ZFS: &str = "/usr/sbin/zfs"; +const MOUNT: &str = "/usr/sbin/mount"; +const UMOUNT: &str = "/usr/sbin/umount"; + +/* + * The user home directory will be dynamically added to the list. + */ +const EPHEMERAL_PATHS: &[&str] = + &["/tmp", "/var/tmp", "/var/run", "/work", "/input", "/opt/buildomat"]; + +/* + * Create a directory suitable to be chroot'd into, to execute the job. + * + * The goal of this factory is to run jobs in a multi-user host, but even with + * the isolation provided by UNIX users we can't let the jobs run with regular + * access to the host's filesystem: + * + * - One job's temporary files shouldn't interfere with temporary files from + * other jobs or the host, so we'd need isolated /tmp and /var/tmp. + * + * - Buildomat commits to providing the /work and /input directories to the job, + * but providing them on the host filesystem would mean only one job can run + * in each host at the time, which is not really efficient. + * + * - As defense-in-depth, it'd be better to enforce on a filesystem level that + * everything except the few directories dedicated to the build are read-only, + * to protect from misconfigured permissions. + * + * This function prepares a directory with a collection of nested mount points + * to provide a read-only view of the filesystem with just a few allowlisted + * paths that are writeable. + */ +pub(super) fn prepare( + central: &Central, + worker: WorkerId, + user: Uid, +) -> Result { + let passwd = Passwd::by_uid(user)? + .ok_or_else(|| anyhow!("could not locate user {user:?}"))?; + let home = passwd + .dir + .as_deref() + .ok_or_else(|| { + anyhow!("could not locate home directory of user {user:?}") + })? + .to_str() + .ok_or_else(|| anyhow!("non-UTF-8 home directory for user {user:?}"))?; + + cleanup(central, worker) + .context("failed to cleanup before preparing the chroot")?; + + /* + * Create the loopback virtual file system, mapping the root filesystem into + * the chroot as a read-only device. We will add writeable mount points on + * top of it to let the build write data. + */ + let root = root_dir(worker); + std::fs::create_dir_all(&root)?; + mount("lofs", Path::new("/"), &root, &["-o", "ro"])?; + + /* + * The ZFS dataset to store written data is created with sync=disabled to + * improve build performance at the cost of data integrity in case of a + * sudden power loss. We don't care about that property for jobs. + */ + let dataset = zfs_dataset_name(central, worker); + run(Command::new(ZFS) + .args(["create", "-p", "-osync=disabled"]) + .arg(&dataset)) + .with_context(|| format!("failed to create ZFS dataset {dataset:?}"))?; + + /* + * All writeable directories will be backed by the ZFS dataset created + * earlier rather than a tmpfs, to avoid exhausting RAM. + */ + for writable in EPHEMERAL_PATHS.iter().chain(once(&home)) { + let src = zfs_dataset_mount(central, worker) + .join(writable.trim_start_matches('/')); + std::fs::create_dir_all(&src)?; + chown(&src, Some(passwd.uid.0), Some(passwd.gid.0))?; + + /* + * Mounting requires the mount point to be an existing directory. This + * is trickier than it sounds, as we cannot create the directory inside + * of the chroot (the loopback mount is read-only!). We thus have to + * create the directory in the root filesystem. + */ + std::fs::create_dir_all(writable)?; + + let dest = root.join(writable.trim_start_matches('/')); + mount("lofs", &src, &dest, &[])?; + } + + Ok(root) +} + +pub(super) fn cleanup(central: &Central, worker: WorkerId) -> Result<()> { + /* + * When changing this function, keep in mind that it needs to succeed even + * when it was interrupted halfway through in the past, or when there is + * nothing left to clean. + */ + + let root = root_dir(worker); + let dataset = zfs_dataset_name(central, worker); + + /* + * The chroot contains a read-only loopback mount of / and other mountpoints + * nested into it, providing write access to specific directories. We need + * to delete the nested mount points before we delete the parent mount, or + * the unmounting will fail. + * + * Thankfully /etc/mnttab lists mount points ordered by mount time, so if we + * process them in reverse we'll do the correct thing. + */ + for path in paths_with_mounts()?.iter().rev() { + if path.starts_with(&root) { + run(Command::new(UMOUNT).arg(path)) + .with_context(|| format!("failed to unmount {path:?}"))?; + } + } + + if list_zfs_datasets()?.contains(&dataset) { + run(Command::new(ZFS).arg("destroy").arg(&dataset)).with_context( + || format!("failed to destroy ZFS dataset {dataset:?}"), + )?; + } + + if root.exists() { + std::fs::remove_dir_all(&root) + .with_context(|| format!("failed to remove {root:?}"))?; + } + + Ok(()) +} + +fn root_dir(worker: WorkerId) -> PathBuf { + Path::new("/var/run/buildomat/worker-roots").join(worker.to_string()) +} + +fn zfs_dataset_name(central: &Central, worker: WorkerId) -> String { + let parent = central + .config + .illumos + .parent_zfs_dataset + .as_deref() + .unwrap_or("rpool/buildomat-workers"); + format!("{parent}/{worker}") +} + +fn zfs_dataset_mount(central: &Central, worker: WorkerId) -> PathBuf { + Path::new("/").join(zfs_dataset_name(central, worker)) +} + +fn mount(fstype: &str, from: &Path, to: &Path, opts: &[&str]) -> Result<()> { + run(Command::new(MOUNT).args(["-F", fstype]).args(opts).arg(from).arg(to)) + .with_context(|| { + format!("failed to mount from {from:?} to {to:?} with {fstype}") + })?; + Ok(()) +} + +fn list_zfs_datasets() -> Result> { + Ok(run(Command::new(ZFS).args(["list", "-H", "-o", "name"])) + .context("failed to list ZFS datasets")? + .lines() + .filter(|line| !line.is_empty()) + .map(|line| line.into()) + .collect()) +} + +fn paths_with_mounts() -> Result> { + /* + * The format of /etc/mnttab is described in mnttab(5). + */ + Ok(std::fs::read_to_string("/etc/mnttab") + .context("failed to read mnttab")? + .lines() + .filter_map(|line| line.split('\t').nth(1)) + .map(PathBuf::from) + .collect()) +} + +fn run(cmd: &mut Command) -> Result { + let name = cmd.get_program().to_string_lossy().to_string(); + let output = cmd + .output() + .with_context(|| format!("failed to spawn command {name:?}"))?; + if !output.status.success() { + bail!("{name:?} failed: {}", output.info()); + } + output.stdout_string() +} diff --git a/factory/user/src/illumos/mod.rs b/factory/user/src/illumos/mod.rs new file mode 100644 index 00000000..02d0bd8c --- /dev/null +++ b/factory/user/src/illumos/mod.rs @@ -0,0 +1,300 @@ +/* + * Copyright 2026 Oxide Computer Company + */ + +mod chroot; +mod unix; +mod user; + +use crate::coordinator::DoNext; +use crate::{bootstrap_agent::BootstrapContext, db::types::*, Central}; +use anyhow::{anyhow, bail, Result}; +use base64::prelude::{Engine as _, BASE64_STANDARD}; +use buildomat_common::unix::Passwd; +use slog::{debug, error, info, warn, Logger}; +use smf::scf_type_t::SCF_TYPE_ASTRING; +use smf::{CommitResult, Scf, State as SmfState}; +use std::time::{Duration, Instant}; + +const SMF_SERVICE: &str = "site/buildomat/factory-user-worker"; + +pub(crate) async fn worker_iteration( + log: &Logger, + c: &Central, + id: WorkerId, +) -> Result { + let worker = c.db.worker_get(id)?; + let Some(worker) = worker else { + bail!("no worker {id} in the database?"); + }; + + match worker.state { + WorkerState::Unconfigured => { + let scf = Scf::new()?; + let scope = scf.scope_local()?; + let service = scope + .get_service(SMF_SERVICE)? + .ok_or_else(|| anyhow!("missing SMF service {SMF_SERVICE}"))?; + + /* + * There is a small chance the slot might not exist, if it was + * removed from the configuration file after the worker was created + * but before this function was called. + */ + let Some(slot) = c.config.slots.get(&worker.slot) else { + error!(log, "worker {id} uses unknown slot {:?}", worker.slot); + c.db.worker_new_state(id, WorkerState::Destroying)?; + return Ok(DoNext::Immediate); + }; + + let (uid, gid) = user::create(id, slot)?; + let chroot = chroot::prepare(c, id, uid)?; + + let instance = match service.get_instance(&smf_name(id))? { + Some(instance) => instance, + None => service.add_instance(&smf_name(id))?, + }; + let pg = match instance.get_pg("buildomat")? { + Some(pg) => pg, + None => instance.add_pg("buildomat", "application")?, + }; + + let raw_ctx = BootstrapContext { + baseurl: c.config.general.baseurl.clone(), + bootstrap_token: worker.bootstrap, + env: slot.env.clone(), + uid, + gid, + chroot, + }; + let ctx = BASE64_STANDARD.encode(&serde_json::to_vec(&raw_ctx)?); + loop { + let tx = pg.transaction()?; + tx.start()?; + tx.property_ensure("ctx", SCF_TYPE_ASTRING, &ctx)?; + match tx.commit()? { + CommitResult::Success => break, + CommitResult::OutOfDate => pg.update()?, + } + } + + /* + * When creating an instance of a service, it's created in the SMF + * repository database (svc.configd) but it doesn't seem to end up + * fully initialised by the restarter daemon (svc.startd) unless the + * restarter is told to do something. That causes weird errors when + * attempting to look at the state of the service. + * + * The presence of a running snapshot indicates the restarter daemon + * knows about the service, so if none is present we do any action + * (like in our case disabling the service) to let the restarter the + * service actually exists. + */ + if instance.get_running_snapshot().is_err() { + instance.disable(false)?; + } + + instance.enable(false)?; + c.db.worker_new_state(id, WorkerState::Configured)?; + info!(log, "configured worker {id}"); + + Ok(DoNext::Sleep) + } + WorkerState::Configured => { + let (current_state, target_state) = { + let scf = Scf::new()?; + let scope = scf.scope_local()?; + let service = + scope.get_service(SMF_SERVICE)?.ok_or_else(|| { + anyhow!("missing SMF service {SMF_SERVICE}") + })?; + + let Some(instance) = service.get_instance(&smf_name(id))? + else { + warn!( + log, + "SMF instance for worker {id} disappeared, \ + destroying the worker" + ); + c.db.worker_new_state(id, WorkerState::Destroying)?; + return Ok(DoNext::Immediate); + }; + + instance.states()? + }; + + /* + * We don't care about the current state when the service is + * transitioning to a new state. + */ + if target_state.is_some() { + return Ok(DoNext::Sleep); + } + + match current_state { + Some(SmfState::Online) => Ok(DoNext::Sleep), + Some(SmfState::Uninitialized) => { + /* + * This occurs briefly prior to svc.startd(8) picking up the + * new instance. + */ + Ok(DoNext::Sleep) + } + Some(SmfState::Maintenance) => { + /* + * This is a terminal failure state that means the worker is + * not running anymore. + */ + warn!( + log, + "SMF instance in maintenance, \ + marking worker {id} as broken" + ); + + /* + * Mark the worker as failed on the core server. This will + * mark the job running on the worker (if any) as failed, + * and hold the worker so that an operator can look at it. + */ + c.client + .factory_worker_fail() + .worker(id.to_string()) + .body_map(|b| b.reason("SMF service in maintenance")) + .send() + .await?; + + c.db.worker_new_state(id, WorkerState::Broken)?; + Ok(DoNext::Immediate) + } + state => { + warn!(log, "SMF instance reached unknown state {state:?}"); + + /* + * Mark the worker as failed on the core server. This will + * mark the job running on the worker (if any) as failed, + * and hold the worker so that an operator can look at it. + */ + c.client + .factory_worker_fail() + .worker(id.to_string()) + .body_map(|b| { + b.reason(format!( + "SMF instance reached unknown state {state:?}" + )) + }) + .send() + .await?; + c.db.worker_new_state(id, WorkerState::Broken)?; + + Ok(DoNext::Immediate) + } + } + } + WorkerState::Destroying => { + { + let scf = Scf::new()?; + let scope = scf.scope_local()?; + let svc = scope.get_service(SMF_SERVICE)?.ok_or_else(|| { + anyhow!("missing SMF service {SMF_SERVICE}") + })?; + + if let Some(inst) = svc.get_instance(&smf_name(id))? { + let (current_state, target_state) = inst.states()?; + if target_state.is_some() { + return Ok(DoNext::Sleep); + } + match current_state { + Some(SmfState::Disabled | SmfState::Maintenance) => { + /* + * This is a terminal state and we can delete the + * service instance. + */ + inst.delete()?; + } + _ => { + info!( + log, + "disabling SMF instance for worker {id}" + ); + inst.disable(true)?; + return Ok(DoNext::Sleep); + } + } + }; + /* + * The semicolon above is load-bearing. + */ + } + + /* + * While we have disabled the service, it is possible that some + * processes could have been created outside the contract. Ensure + * all processes for the build user are terminated and then remove + * any files left behind. + */ + if let Some(user) = Passwd::by_name(&user::name(id))? { + kill_all(log, &user).await?; + } + + chroot::cleanup(c, id)?; + user::remove(id)?; + + info!(log, "worker {id} successfully destroyed"); + c.db.worker_new_state(id, WorkerState::Destroyed)?; + Ok(DoNext::Immediate) + } + WorkerState::Broken => { + /* + * We will remain in the broken state until the core server decides + * to recycle the worker. + */ + Ok(DoNext::Sleep) + } + WorkerState::Destroyed => { + /* + * The coordinator will take care of stopping the task instead of + * sleeping when it's time for it to + */ + Ok(DoNext::Sleep) + } + } +} + +fn smf_name(id: WorkerId) -> String { + format!("bmat-{id}") +} + +async fn kill_all(log: &Logger, u: &Passwd) -> Result<()> { + let user = if let Some(name) = &u.name { + format!("user {name} (uid {})", u.uid.0) + } else { + format!("uid {}", u.uid.0) + }; + + let id = unix::SigSendId::UserId(u.uid.0); + if unix::sigsend_maybe(id, libc::SIGKILL)? { + debug!(log, "killed leftover processes for {user}"); + + /* + * There are a number of reasons that processes might linger for a + * little while in the table after we terminate them. Wait for a bit to + * make sure they're all gone: + */ + let start = Instant::now(); + while Instant::now().saturating_duration_since(start).as_secs() < 10 { + /* + * Try with the 0 argument, which can be used to confirm that no + * processes exist. + */ + if !unix::sigsend_maybe(id, 0)? { + return Ok(()); + } + + tokio::time::sleep(Duration::from_millis(100)).await; + } + + bail!("processes for {user} still exist?"); + } + + Ok(()) +} diff --git a/factory/user/src/illumos/unix.rs b/factory/user/src/illumos/unix.rs new file mode 100644 index 00000000..04ce2d12 --- /dev/null +++ b/factory/user/src/illumos/unix.rs @@ -0,0 +1,48 @@ +/* + * Copyright 2026 Oxide Computer Company + */ + +use anyhow::{bail, Result}; + +#[cfg(target_os = "illumos")] +mod sys { + use libc::{c_int, id_t, idtype_t}; + + pub const P_UID: idtype_t = 5; + + extern "C" { + pub fn sigsend(idtype: idtype_t, id: id_t, sig: c_int) -> c_int; + } +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +#[non_exhaustive] +pub enum SigSendId { + UserId(u32), +} + +#[cfg(target_os = "illumos")] +pub fn sigsend_maybe(id: SigSendId, sig: i32) -> Result { + let (idtype, id) = match id { + SigSendId::UserId(uid) => (sys::P_UID, uid as i32), + }; + + if unsafe { sys::sigsend(idtype, id, sig) } == 0 { + return Ok(true); + } + let error = std::io::Error::last_os_error(); + match error.raw_os_error() { + Some(libc::ESRCH) => { + /* + * No processes were found that matched the criteria. + */ + Ok(false) + } + _ => bail!("sigsend({idtype}, {id}, {sig})): {error}"), + } +} + +#[cfg(not(target_os = "illumos"))] +pub fn sigsend_maybe(_id: SigSendId, _sig: i32) -> Result { + bail!("only works on illumos systems"); +} diff --git a/factory/user/src/illumos/user.rs b/factory/user/src/illumos/user.rs new file mode 100644 index 00000000..11f9e3df --- /dev/null +++ b/factory/user/src/illumos/user.rs @@ -0,0 +1,131 @@ +/* + * Copyright 2026 Oxide Computer Company + */ + +use crate::config::ConfigFileSlot; +use crate::db::types::WorkerId; +use anyhow::{bail, Context as _, Result}; +use buildomat_common::unix::{Gid, Uid}; +use rand::Rng as _; +use std::ops::Range; +use std::path::Path; +use std::process::Command; + +const ID_RANGE: Range = 2u32.pow(20)..2u32.pow(30); + +pub(super) fn create( + worker: WorkerId, + slot: &ConfigFileSlot, +) -> Result<(Uid, Gid)> { + remove(worker).context("failed to cleanup users before creating them")?; + + let gid = loop { + let gid = rand::rng().random_range(ID_RANGE); + + let output = Command::new("/usr/sbin/groupadd") + .arg("-g") + .arg(gid.to_string()) + .arg(name(worker)) + .output() + .context("failed to spawn groupadd")?; + + match output.status.code() { + Some(0) => break gid, + /* + * Exit status of 4 indicates the random gid we generated already + * exists: reroll another one in the next iteration. + */ + Some(4) => continue, + _ => { + bail!( + "adding group {:?} exited with {}: {}", + name(worker), + output.status, + String::from_utf8_lossy(&output.stderr) + ); + } + } + }; + + let home = Path::new("/home").join(name(worker)); + let uid = loop { + let uid = rand::rng().random_range(ID_RANGE); + + let mut cmd = Command::new("/usr/sbin/useradd"); + cmd.arg("-u").arg(uid.to_string()); + cmd.arg("-d").arg(&home); + cmd.arg("-g").arg(gid.to_string()); + for group in &slot.add_to_groups { + cmd.arg("-G").arg(group); + } + cmd.arg(name(worker)); + + let output = cmd.output().context("failed to spawn useradd")?; + match output.status.code() { + Some(0) => break uid, + /* + * Exit status of 4 indicates the random gid we generated already + * exists: reroll another one in the next iteration. + */ + Some(4) => continue, + _ => { + bail!( + "adding user {:?} exited with {}: {}", + name(worker), + output.status, + String::from_utf8_lossy(&output.stderr) + ); + } + } + }; + + Ok((Uid(uid), Gid(gid))) +} + +pub(super) fn remove(worker: WorkerId) -> Result<()> { + let userdel = Command::new("/usr/sbin/userdel") + .arg(name(worker)) + .output() + .context("failed to invoke userdel")?; + match userdel.status.code() { + Some(0) => {} + /* + * Exit status of 6 indicates that the user doesn't exist. + */ + Some(6) => {} + _ => { + bail!( + "deleting group {:?} exited with {}: {}", + name(worker), + userdel.status, + String::from_utf8_lossy(&userdel.stderr), + ); + } + } + + let groupdel = Command::new("/usr/sbin/groupdel") + .arg(name(worker)) + .output() + .context("failed to invoke groupdel")?; + match groupdel.status.code() { + Some(0) => {} + /* + * Exit status of 6 indicates that the group doesn't exist. + */ + Some(6) => {} + _ => { + bail!( + "deleting group {:?} exited with {}: {}", + name(worker), + groupdel.status, + String::from_utf8_lossy(&groupdel.stderr), + ); + } + } + + Ok(()) +} + +pub(super) fn name(worker: WorkerId) -> String { + format!("bmat-{worker}") +} diff --git a/factory/user/src/main.rs b/factory/user/src/main.rs new file mode 100644 index 00000000..4ad7af43 --- /dev/null +++ b/factory/user/src/main.rs @@ -0,0 +1,99 @@ +/* + * Copyright 2026 Oxide Computer + */ + +use crate::config::ConfigFile; +use crate::coordinator::coordinator_loop; +use crate::db::Database; +use crate::factory::factory_loop; +use anyhow::{bail, Result}; +use buildomat_common::*; +use getopts::Options; +use slog::Logger; +use std::{sync::Arc, time::Duration}; + +mod bootstrap_agent; +mod config; +mod coordinator; +mod db; +mod factory; +mod illumos; + +struct Central { + log: Logger, + client: buildomat_client::Client, + config: ConfigFile, + db: Database, +} + +/* + * This factory may run on large AMD machines (e.g., 100+ SMT threads) and thus + * could up with far too many worker threads by default. + */ +#[tokio::main(worker_threads = 4)] +async fn main() -> Result<()> { + usdt::register_probes().unwrap(); + + if std::env::args().nth(1).as_deref() == Some("__bootstrap_agent") { + bootstrap_agent::run().await?; + return Ok(()); + } + + let mut opts = Options::new(); + + opts.optopt("f", "", "configuration file", "CONFIG"); + opts.optopt("d", "", "database file", "FILE"); + + let p = match opts.parse(std::env::args().skip(1)) { + Ok(p) => p, + Err(e) => { + eprintln!("ERROR: usage: {}", e); + eprintln!(" {}", opts.usage("usage")); + std::process::exit(1); + } + }; + + let log = make_log("factory-user"); + + let config: ConfigFile = if let Some(f) = p.opt_str("f").as_deref() { + read_toml(f)? + } else { + bail!("must specify configuration file (-f)"); + }; + + let db = if let Some(p) = p.opt_str("d") { + Database::new(log.clone(), p, None)? + } else { + bail!("must specify database file (-d)"); + }; + + let client = buildomat_client::ClientBuilder::new(&config.general.baseurl) + .bearer_token(&config.factory.token) + .build()?; + + /* + * Install a custom panic hook that will try to exit the process after a + * short delay. This is unfortunate, but I am not sure how else to avoid a + * panicked worker thread leaving the process stuck without some of its + * functionality. + */ + let orig_hook = std::panic::take_hook(); + std::panic::set_hook(Box::new(move |info| { + orig_hook(info); + eprintln!("FATAL: THREAD PANIC DETECTED; EXITING IN 5 SECONDS..."); + std::thread::spawn(move || { + std::thread::sleep(Duration::from_secs(5)); + std::process::exit(101); + }); + })); + + let c = Arc::new(Central { log, config, client, db }); + + let t_coordinator = tokio::spawn(coordinator_loop(Arc::clone(&c))); + let t_factory = tokio::spawn(factory_loop(Arc::clone(&c))); + + tokio::select! { + _ = t_coordinator => bail!("coordinator task stopped early"), + _ = t_factory => bail!("factory task stopped early"), + } +} diff --git a/server/src/api/factory.rs b/server/src/api/factory.rs index 0c3bbe5a..40497f16 100644 --- a/server/src/api/factory.rs +++ b/server/src/api/factory.rs @@ -270,6 +270,53 @@ pub(crate) async fn factory_worker_flush( Ok(HttpResponseUpdatedNoContent()) } +#[derive(Deserialize, Serialize, JsonSchema)] +pub(crate) struct FactoryWorkerFail { + reason: String, +} + +#[endpoint { + method = POST, + path = "/0/factory/worker/{worker}/fail", +}] +pub(crate) async fn factory_worker_fail( + rqctx: RequestContext>, + path: TypedPath, + body: TypedBody, +) -> DSResult { + let c = rqctx.context(); + let log = &rqctx.log; + + let worker_id = path.into_inner().worker()?; + let reason = body.into_inner().reason; + + let factory = c.require_factory(log, &rqctx.request).await?; + let worker = c.db.worker(worker_id).or_500()?; + factory.owns(log, &worker)?; + + warn!( + log, "worker failed!"; + "id" => worker.id.to_string(), "reason" => &reason, + ); + + /* + * Record in the database that the worker has failed. This routine will + * take care of reporting failure in any assigned jobs, marking the worker + * as held, etc. + */ + let failed_jobs = c.db.worker_mark_failed(worker.id, &reason).or_500()?; + if !failed_jobs.is_empty() { + let jobs = failed_jobs + .into_iter() + .map(|j| j.to_string()) + .collect::>() + .join(", "); + warn!(log, "worker {} failing caused jobs {jobs} to fail", worker.id); + } + + Ok(HttpResponseUpdatedNoContent()) +} + #[derive(Debug, Deserialize, JsonSchema)] pub(crate) struct FactoryWorkerAssociate { private: String, diff --git a/server/src/main.rs b/server/src/main.rs index d6b762e1..048581df 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1011,6 +1011,7 @@ async fn main() -> Result<()> { ad.register(api::factory::factory_ping)?; ad.register(api::factory::factory_worker_create)?; ad.register(api::factory::factory_worker_append)?; + ad.register(api::factory::factory_worker_fail)?; ad.register(api::factory::factory_worker_flush)?; ad.register(api::factory::factory_worker_associate)?; ad.register(api::factory::factory_worker_destroy)?;