diff --git a/server/src/api/factory.rs b/server/src/api/factory.rs index 0c3bbe5a..1ac1b19b 100644 --- a/server/src/api/factory.rs +++ b/server/src/api/factory.rs @@ -3,6 +3,7 @@ */ use super::prelude::*; +use crate::api::worker::validate_stream; trait WorkerOwns { fn owns(&self, log: &Logger, worker: &db::Worker) -> DSResult<()>; @@ -199,6 +200,8 @@ pub(crate) async fn factory_worker_append( let job = c.db.worker_job(w.id).or_500()?; + validate_stream(&b.stream)?; + let retry = if let Some(job) = job { if job.complete { /* diff --git a/server/src/api/mod.rs b/server/src/api/mod.rs index f091c4e8..860ae2e8 100644 --- a/server/src/api/mod.rs +++ b/server/src/api/mod.rs @@ -3,7 +3,9 @@ */ mod prelude { - pub(crate) use crate::{db, unauth_response, Central, MakeInternalError}; + pub(crate) use crate::{ + bad_request, db, unauth_response, Central, MakeInternalError, + }; pub use anyhow::Result; pub use buildomat_download::{PotentialRange, RequestContextEx}; pub use buildomat_sse::{HeaderMapEx, ServerSentEvents}; diff --git a/server/src/api/user.rs b/server/src/api/user.rs index 9d39ca4c..351732d3 100644 --- a/server/src/api/user.rs +++ b/server/src/api/user.rs @@ -429,11 +429,7 @@ pub(crate) async fn job_output_signed_url( let b = body.into_inner(); if b.expiry_seconds > 3600 { - return Err(HttpError::for_client_error( - None, - ClientErrorStatusCode::BAD_REQUEST, - "URLs can last at most one hour (3600 seconds)".into(), - )); + return bad_request("URLs can last at most one hour (3600 seconds)"); } let owner = c.require_user(log, &rqctx.request).await?; @@ -492,11 +488,7 @@ impl JobOutputPublish { { Ok(()) } else { - Err(HttpError::for_client_error( - None, - ClientErrorStatusCode::BAD_REQUEST, - "invalid published file ID".into(), - )) + bad_request("invalid published file ID") } } } @@ -959,10 +951,8 @@ fn parse_output_rule(input: &str) -> DSResult { s = State::SlashOrEquals; } other => { - return Err(HttpError::for_client_error( - None, - ClientErrorStatusCode::BAD_REQUEST, - format!("wanted sigil/absolute path, not {:?}", other), + return bad_request(format!( + "wanted sigil/absolute path, not {other:?}" )); } }, @@ -976,10 +966,8 @@ fn parse_output_rule(input: &str) -> DSResult { s = State::Slash; } other => { - return Err(HttpError::for_client_error( - None, - ClientErrorStatusCode::BAD_REQUEST, - format!("{:?} unexpected in output rule", other), + return bad_request(format!( + "{other:?} unexpected in output rule", )); } }, @@ -993,10 +981,8 @@ fn parse_output_rule(input: &str) -> DSResult { s = State::Slash; } other => { - return Err(HttpError::for_client_error( - None, - ClientErrorStatusCode::BAD_REQUEST, - format!("{:?} unexpected in output rule", other), + return bad_request(format!( + "{other:?} unexpected in output rule", )); } }, @@ -1006,10 +992,8 @@ fn parse_output_rule(input: &str) -> DSResult { s = State::Rule; } other => { - return Err(HttpError::for_client_error( - None, - ClientErrorStatusCode::BAD_REQUEST, - format!("wanted '/', not {:?}, in output rule", other), + return bad_request(format!( + "wanted '/', not {other:?}, in output rule", )); } }, @@ -1018,11 +1002,7 @@ fn parse_output_rule(input: &str) -> DSResult { } if !rule.starts_with('/') { - return Err(HttpError::for_client_error( - None, - ClientErrorStatusCode::BAD_REQUEST, - "output rule pattern must be absolute path".to_string(), - )); + return bad_request("output rule pattern must be absolute path"); } if ignore { @@ -1071,37 +1051,21 @@ pub(crate) async fn job_submit( let new_job = new_job.into_inner(); if new_job.tasks.len() > 100 { - return Err(HttpError::for_client_error( - None, - ClientErrorStatusCode::BAD_REQUEST, - "too many tasks".into(), - )); + return bad_request("too many tasks"); } if new_job.inputs.len() > 25 { - return Err(HttpError::for_client_error( - None, - ClientErrorStatusCode::BAD_REQUEST, - "too many inputs".into(), - )); + return bad_request("too many inputs"); } if new_job.tags.len() > 100 { - return Err(HttpError::for_client_error( - None, - ClientErrorStatusCode::BAD_REQUEST, - "too many tags".into(), - )); + return bad_request("too many tags"); } if new_job.tags.iter().map(|(n, v)| n.len() + v.len()).sum::() > 131072 { - return Err(HttpError::for_client_error( - None, - ClientErrorStatusCode::BAD_REQUEST, - "total size of all tags is larger than 128KB".into(), - )); + return bad_request("total size of all tags is larger than 128KB"); } for n in new_job.tags.keys() { @@ -1119,11 +1083,7 @@ pub(crate) async fn job_submit( || c == '-' }) { - return Err(HttpError::for_client_error( - None, - ClientErrorStatusCode::BAD_REQUEST, - "tag names must be [0-9a-z._-]+".into(), - )); + return bad_request("tag names must be [0-9a-z._-]+"); } } @@ -1135,10 +1095,9 @@ pub(crate) async fn job_submit( Some(target) => target, None => { info!(log, "could not resolve target name {:?}", new_job.target); - return Err(HttpError::for_client_error( - None, - ClientErrorStatusCode::BAD_REQUEST, - format!("could not resolve target name {:?}", new_job.target), + return bad_request(format!( + "could not resolve target name {:?}", + new_job.target )); } }; @@ -1285,22 +1244,14 @@ pub(crate) async fn job_add_input( let add = add.into_inner(); if add.name.contains('/') { - return Err(HttpError::for_client_error( - None, - ClientErrorStatusCode::BAD_REQUEST, - "name must not be a path".into(), - )); + return bad_request("name must not be a path"); } let max = c.config.job.max_bytes_per_input(); if add.size > max { - return Err(HttpError::for_client_error( - None, - ClientErrorStatusCode::BAD_REQUEST, - format!( - "input file size {} bigger than allowed maximum {max} bytes", - add.size, - ), + return bad_request(format!( + "input file size {} bigger than allowed maximum {max} bytes", + add.size, )); } @@ -1373,11 +1324,7 @@ pub(crate) async fn job_add_input( add.size, e, ); - Err(HttpError::for_client_error( - Some("invalid".to_string()), - ClientErrorStatusCode::BAD_REQUEST, - format!("{}", e), - )) + bad_request(e) } } } @@ -1407,11 +1354,7 @@ pub(crate) async fn job_add_input_sync( let job = c.load_job_for_user(log, &owner, p.job()?).await?; if !job.waiting { - return Err(HttpError::for_client_error( - None, - ClientErrorStatusCode::CONFLICT, - "cannot add inputs to a job that is not waiting".into(), - )); + return bad_request("cannot add inputs to a job that is not waiting"); } /* @@ -1421,20 +1364,15 @@ pub(crate) async fn job_add_input_sync( */ let add = add.into_inner(); let addsize = if add.size < 0 || add.size > 1024 * 1024 * 1024 { - return Err(HttpError::for_client_error( - Some("invalid".to_string()), - ClientErrorStatusCode::BAD_REQUEST, - format!("size {} must be between 0 and 1073741824", add.size), + return bad_request(format!( + "size {} must be between 0 and 1073741824", + add.size )); } else { add.size as u64 }; if add.name.contains('/') { - return Err(HttpError::for_client_error( - None, - ClientErrorStatusCode::BAD_REQUEST, - "name must not be a path".into(), - )); + return bad_request("name must not be a path"); } let chunks = add @@ -1456,11 +1394,7 @@ pub(crate) async fn job_add_input_sync( addsize, e, ); - return Err(HttpError::for_client_error( - Some("invalid".to_string()), - ClientErrorStatusCode::BAD_REQUEST, - format!("{:?}", e), - )); + return bad_request(format!("{e:?}")); } }; diff --git a/server/src/api/worker.rs b/server/src/api/worker.rs index 544b573d..9226a02e 100644 --- a/server/src/api/worker.rs +++ b/server/src/api/worker.rs @@ -346,6 +346,8 @@ pub(crate) async fn worker_job_append_one( let j = c.db.job(path.into_inner().job()?).or_500()?; /* XXX */ w.owns(log, &j)?; + validate_stream(&a.stream)?; + info!(log, "worker {} append to job {} stream {}", w.id, j.id, a.stream); c.db.job_append_event( @@ -387,6 +389,10 @@ pub(crate) async fn worker_job_append( let j = c.db.job(path.into_inner().job()?).or_500()?; /* XXX */ w.owns(log, &j)?; + for entry in &a { + validate_stream(&entry.stream)?; + } + info!(log, "worker {} append {} events to job {}", w.id, a.len(), j.id); c.db.job_append_events( @@ -423,6 +429,8 @@ pub(crate) async fn worker_task_append( let j = c.db.job(p.job()?).or_500()?; /* XXX */ w.owns(log, &j)?; + validate_stream(&a.stream)?; + info!( log, "worker {} append to job {} task {} stream {}", @@ -674,13 +682,9 @@ pub(crate) async fn worker_job_add_output( let max = c.config.job.max_bytes_per_output(); if add.size > max { - return Err(HttpError::for_client_error( - None, - ClientErrorStatusCode::BAD_REQUEST, - format!( - "output file size {} bigger than allowed maximum {max} bytes", - add.size, - ), + return bad_request(format!( + "output file size {} bigger than allowed maximum {max} bytes", + add.size, )); } @@ -726,11 +730,7 @@ pub(crate) async fn worker_job_add_output( add.size, e, ); - Err(HttpError::for_client_error( - Some("invalid".to_string()), - ClientErrorStatusCode::BAD_REQUEST, - format!("{}", e), - )) + return bad_request(e); } } } @@ -762,10 +762,9 @@ pub(crate) async fn worker_job_add_output_sync( */ let add = add.into_inner(); let addsize = if add.size < 0 || add.size > 1024 * 1024 * 1024 { - return Err(HttpError::for_client_error( - Some("invalid".to_string()), - ClientErrorStatusCode::BAD_REQUEST, - format!("size {} must be between 0 and 1073741824", add.size), + return bad_request(format!( + "size {} must be between 0 and 1073741824", + add.size )); } else { add.size as u64 @@ -793,11 +792,7 @@ pub(crate) async fn worker_job_add_output_sync( addsize, e, ); - return Err(HttpError::for_client_error( - Some("invalid".to_string()), - ClientErrorStatusCode::BAD_REQUEST, - format!("{:?}", e), - )); + return bad_request(format!("{e:?}")); } }; @@ -864,6 +859,10 @@ pub(crate) async fn worker_append( let a = append.into_inner(); + for entry in &a { + validate_stream(&entry.stream)?; + } + info!(log, "worker {} append {} self events", w.id, a.len()); c.db.worker_append_events( @@ -925,3 +924,21 @@ pub(crate) async fn worker_diagnostics_enable( Ok(HttpResponseUpdatedNoContent()) } + +pub(crate) fn validate_stream(name: &str) -> DSResult<()> { + if name.len() > 64 { + return bad_request(format!( + "stream name {name:?} is longer than 64 bytes" + )); + } + + if let Some(c) = name.chars().find(|c| { + !c.is_ascii_alphanumeric() && *c != '-' && *c != '_' && *c != '.' + }) { + return bad_request(format!( + "invalid char {c:?} in stream name {name:?}" + )); + } + + Ok(()) +} diff --git a/server/src/main.rs b/server/src/main.rs index d6b762e1..d9ac645a 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -133,6 +133,14 @@ pub(crate) fn unauth_response() -> SResult { )) } +pub(crate) fn bad_request(message: impl ToString) -> SResult { + Err(HttpError::for_client_error( + None, + ClientErrorStatusCode::BAD_REQUEST, + message.to_string(), + )) +} + impl Central { fn _int_delegate_username( &self,