Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions server/src/api/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/

use super::prelude::*;
use crate::api::worker::validate_stream;

trait WorkerOwns {
fn owns(&self, log: &Logger, worker: &db::Worker) -> DSResult<()>;
Expand Down Expand Up @@ -199,6 +200,8 @@ pub(crate) async fn factory_worker_append(

let job = c.db.worker_job(w.id).or_500()?;

validate_stream(&b.stream)?;

let retry = if let Some(job) = job {
if job.complete {
/*
Expand Down
4 changes: 3 additions & 1 deletion server/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
*/

mod prelude {
pub(crate) use crate::{db, unauth_response, Central, MakeInternalError};
pub(crate) use crate::{
bad_request, db, unauth_response, Central, MakeInternalError,
};
pub use anyhow::Result;
pub use buildomat_download::{PotentialRange, RequestContextEx};
pub use buildomat_sse::{HeaderMapEx, ServerSentEvents};
Expand Down
126 changes: 30 additions & 96 deletions server/src/api/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,11 +429,7 @@ pub(crate) async fn job_output_signed_url(
let b = body.into_inner();

if b.expiry_seconds > 3600 {
return Err(HttpError::for_client_error(
None,
ClientErrorStatusCode::BAD_REQUEST,
"URLs can last at most one hour (3600 seconds)".into(),
));
return bad_request("URLs can last at most one hour (3600 seconds)");
}

let owner = c.require_user(log, &rqctx.request).await?;
Expand Down Expand Up @@ -492,11 +488,7 @@ impl JobOutputPublish {
{
Ok(())
} else {
Err(HttpError::for_client_error(
None,
ClientErrorStatusCode::BAD_REQUEST,
"invalid published file ID".into(),
))
bad_request("invalid published file ID")
}
}
}
Expand Down Expand Up @@ -959,10 +951,8 @@ fn parse_output_rule(input: &str) -> DSResult<db::CreateOutputRule> {
s = State::SlashOrEquals;
}
other => {
return Err(HttpError::for_client_error(
None,
ClientErrorStatusCode::BAD_REQUEST,
format!("wanted sigil/absolute path, not {:?}", other),
return bad_request(format!(
"wanted sigil/absolute path, not {other:?}"
));
}
},
Expand All @@ -976,10 +966,8 @@ fn parse_output_rule(input: &str) -> DSResult<db::CreateOutputRule> {
s = State::Slash;
}
other => {
return Err(HttpError::for_client_error(
None,
ClientErrorStatusCode::BAD_REQUEST,
format!("{:?} unexpected in output rule", other),
return bad_request(format!(
"{other:?} unexpected in output rule",
));
}
},
Expand All @@ -993,10 +981,8 @@ fn parse_output_rule(input: &str) -> DSResult<db::CreateOutputRule> {
s = State::Slash;
}
other => {
return Err(HttpError::for_client_error(
None,
ClientErrorStatusCode::BAD_REQUEST,
format!("{:?} unexpected in output rule", other),
return bad_request(format!(
"{other:?} unexpected in output rule",
));
}
},
Expand All @@ -1006,10 +992,8 @@ fn parse_output_rule(input: &str) -> DSResult<db::CreateOutputRule> {
s = State::Rule;
}
other => {
return Err(HttpError::for_client_error(
None,
ClientErrorStatusCode::BAD_REQUEST,
format!("wanted '/', not {:?}, in output rule", other),
return bad_request(format!(
"wanted '/', not {other:?}, in output rule",
));
}
},
Expand All @@ -1018,11 +1002,7 @@ fn parse_output_rule(input: &str) -> DSResult<db::CreateOutputRule> {
}

if !rule.starts_with('/') {
return Err(HttpError::for_client_error(
None,
ClientErrorStatusCode::BAD_REQUEST,
"output rule pattern must be absolute path".to_string(),
));
return bad_request("output rule pattern must be absolute path");
}

if ignore {
Expand Down Expand Up @@ -1071,37 +1051,21 @@ pub(crate) async fn job_submit(
let new_job = new_job.into_inner();

if new_job.tasks.len() > 100 {
return Err(HttpError::for_client_error(
None,
ClientErrorStatusCode::BAD_REQUEST,
"too many tasks".into(),
));
return bad_request("too many tasks");
}

if new_job.inputs.len() > 25 {
return Err(HttpError::for_client_error(
None,
ClientErrorStatusCode::BAD_REQUEST,
"too many inputs".into(),
));
return bad_request("too many inputs");
}

if new_job.tags.len() > 100 {
return Err(HttpError::for_client_error(
None,
ClientErrorStatusCode::BAD_REQUEST,
"too many tags".into(),
));
return bad_request("too many tags");
}

if new_job.tags.iter().map(|(n, v)| n.len() + v.len()).sum::<usize>()
> 131072
{
return Err(HttpError::for_client_error(
None,
ClientErrorStatusCode::BAD_REQUEST,
"total size of all tags is larger than 128KB".into(),
));
return bad_request("total size of all tags is larger than 128KB");
}

for n in new_job.tags.keys() {
Expand All @@ -1119,11 +1083,7 @@ pub(crate) async fn job_submit(
|| c == '-'
})
{
return Err(HttpError::for_client_error(
None,
ClientErrorStatusCode::BAD_REQUEST,
"tag names must be [0-9a-z._-]+".into(),
));
return bad_request("tag names must be [0-9a-z._-]+");
}
}

Expand All @@ -1135,10 +1095,9 @@ pub(crate) async fn job_submit(
Some(target) => target,
None => {
info!(log, "could not resolve target name {:?}", new_job.target);
return Err(HttpError::for_client_error(
None,
ClientErrorStatusCode::BAD_REQUEST,
format!("could not resolve target name {:?}", new_job.target),
return bad_request(format!(
"could not resolve target name {:?}",
new_job.target
));
}
};
Expand Down Expand Up @@ -1285,22 +1244,14 @@ pub(crate) async fn job_add_input(

let add = add.into_inner();
if add.name.contains('/') {
return Err(HttpError::for_client_error(
None,
ClientErrorStatusCode::BAD_REQUEST,
"name must not be a path".into(),
));
return bad_request("name must not be a path");
}

let max = c.config.job.max_bytes_per_input();
if add.size > max {
return Err(HttpError::for_client_error(
None,
ClientErrorStatusCode::BAD_REQUEST,
format!(
"input file size {} bigger than allowed maximum {max} bytes",
add.size,
),
return bad_request(format!(
"input file size {} bigger than allowed maximum {max} bytes",
add.size,
));
}

Expand Down Expand Up @@ -1373,11 +1324,7 @@ pub(crate) async fn job_add_input(
add.size,
e,
);
Err(HttpError::for_client_error(
Some("invalid".to_string()),
ClientErrorStatusCode::BAD_REQUEST,
format!("{}", e),
))
bad_request(e)
}
}
}
Expand Down Expand Up @@ -1407,11 +1354,7 @@ pub(crate) async fn job_add_input_sync(
let job = c.load_job_for_user(log, &owner, p.job()?).await?;

if !job.waiting {
return Err(HttpError::for_client_error(
None,
ClientErrorStatusCode::CONFLICT,
"cannot add inputs to a job that is not waiting".into(),
));
return bad_request("cannot add inputs to a job that is not waiting");
}

/*
Expand All @@ -1421,20 +1364,15 @@ pub(crate) async fn job_add_input_sync(
*/
let add = add.into_inner();
let addsize = if add.size < 0 || add.size > 1024 * 1024 * 1024 {
return Err(HttpError::for_client_error(
Some("invalid".to_string()),
ClientErrorStatusCode::BAD_REQUEST,
format!("size {} must be between 0 and 1073741824", add.size),
return bad_request(format!(
"size {} must be between 0 and 1073741824",
add.size
));
} else {
add.size as u64
};
if add.name.contains('/') {
return Err(HttpError::for_client_error(
None,
ClientErrorStatusCode::BAD_REQUEST,
"name must not be a path".into(),
));
return bad_request("name must not be a path");
}

let chunks = add
Expand All @@ -1456,11 +1394,7 @@ pub(crate) async fn job_add_input_sync(
addsize,
e,
);
return Err(HttpError::for_client_error(
Some("invalid".to_string()),
ClientErrorStatusCode::BAD_REQUEST,
format!("{:?}", e),
));
return bad_request(format!("{e:?}"));
}
};

Expand Down
59 changes: 38 additions & 21 deletions server/src/api/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ pub(crate) async fn worker_job_append_one(
let j = c.db.job(path.into_inner().job()?).or_500()?; /* XXX */
w.owns(log, &j)?;

validate_stream(&a.stream)?;

info!(log, "worker {} append to job {} stream {}", w.id, j.id, a.stream);

c.db.job_append_event(
Expand Down Expand Up @@ -387,6 +389,10 @@ pub(crate) async fn worker_job_append(
let j = c.db.job(path.into_inner().job()?).or_500()?; /* XXX */
w.owns(log, &j)?;

for entry in &a {
validate_stream(&entry.stream)?;
}

info!(log, "worker {} append {} events to job {}", w.id, a.len(), j.id);

c.db.job_append_events(
Expand Down Expand Up @@ -423,6 +429,8 @@ pub(crate) async fn worker_task_append(
let j = c.db.job(p.job()?).or_500()?; /* XXX */
w.owns(log, &j)?;

validate_stream(&a.stream)?;

info!(
log,
"worker {} append to job {} task {} stream {}",
Expand Down Expand Up @@ -674,13 +682,9 @@ pub(crate) async fn worker_job_add_output(

let max = c.config.job.max_bytes_per_output();
if add.size > max {
return Err(HttpError::for_client_error(
None,
ClientErrorStatusCode::BAD_REQUEST,
format!(
"output file size {} bigger than allowed maximum {max} bytes",
add.size,
),
return bad_request(format!(
"output file size {} bigger than allowed maximum {max} bytes",
add.size,
));
}

Expand Down Expand Up @@ -726,11 +730,7 @@ pub(crate) async fn worker_job_add_output(
add.size,
e,
);
Err(HttpError::for_client_error(
Some("invalid".to_string()),
ClientErrorStatusCode::BAD_REQUEST,
format!("{}", e),
))
return bad_request(e);
}
}
}
Expand Down Expand Up @@ -762,10 +762,9 @@ pub(crate) async fn worker_job_add_output_sync(
*/
let add = add.into_inner();
let addsize = if add.size < 0 || add.size > 1024 * 1024 * 1024 {
return Err(HttpError::for_client_error(
Some("invalid".to_string()),
ClientErrorStatusCode::BAD_REQUEST,
format!("size {} must be between 0 and 1073741824", add.size),
return bad_request(format!(
"size {} must be between 0 and 1073741824",
add.size
));
} else {
add.size as u64
Expand Down Expand Up @@ -793,11 +792,7 @@ pub(crate) async fn worker_job_add_output_sync(
addsize,
e,
);
return Err(HttpError::for_client_error(
Some("invalid".to_string()),
ClientErrorStatusCode::BAD_REQUEST,
format!("{:?}", e),
));
return bad_request(format!("{e:?}"));
}
};

Expand Down Expand Up @@ -864,6 +859,10 @@ pub(crate) async fn worker_append(

let a = append.into_inner();

for entry in &a {
validate_stream(&entry.stream)?;
}

info!(log, "worker {} append {} self events", w.id, a.len());

c.db.worker_append_events(
Expand Down Expand Up @@ -925,3 +924,21 @@ pub(crate) async fn worker_diagnostics_enable(

Ok(HttpResponseUpdatedNoContent())
}

pub(crate) fn validate_stream(name: &str) -> DSResult<()> {
if name.len() > 64 {
return bad_request(format!(
"stream name {name:?} is longer than 64 bytes"
));
}

if let Some(c) = name.chars().find(|c| {
!c.is_ascii_alphanumeric() && *c != '-' && *c != '_' && *c != '.'
}) {
return bad_request(format!(
"invalid char {c:?} in stream name {name:?}"
));
}

Ok(())
}
Loading