diff --git a/Cargo.lock b/Cargo.lock index abe78a8ca7..233485a5d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2227,6 +2227,7 @@ dependencies = [ "tokio-util", "tower-http", "tracing", + "url", "utoipa", "utoipa-axum", "uuid", diff --git a/components/log-ingestor/Cargo.toml b/components/log-ingestor/Cargo.toml index 9636a527db..814ac6a82b 100644 --- a/components/log-ingestor/Cargo.toml +++ b/components/log-ingestor/Cargo.toml @@ -41,6 +41,7 @@ tokio-util = "0.7.18" tracing = "0.1.44" tower-http = { version = "0.6.8", features = ["cors"] } thiserror = "2.0.18" +url = "2.5.8" utoipa = { version = "5.4.0", features = ["axum_extras"] } utoipa-axum = "0.2.0" diff --git a/components/log-ingestor/src/ingestion_job/sqs_listener.rs b/components/log-ingestor/src/ingestion_job/sqs_listener.rs index 1a8749236c..659d543142 100644 --- a/components/log-ingestor/src/ingestion_job/sqs_listener.rs +++ b/components/log-ingestor/src/ingestion_job/sqs_listener.rs @@ -11,6 +11,7 @@ use clp_rust_utils::{ s3::ObjectMetadata, sqs::event::{Record, S3}, }; +use non_empty_string::NonEmptyString; use tokio::select; use tokio_util::sync::CancellationToken; @@ -102,6 +103,7 @@ impl, State: IngestionJobState + /// * Forwards the following `aws_sdk_sqs` methods' return values on failure: /// * [`DeleteMessageBatchFluentBuilder::send`] /// * [`DeleteMessageBatchRequestEntryBuilder::build`] + /// * Forwards [`Self::extract_object_metadata`]'s return values on failure. /// * Forwards [`SqsListenerState::ingest`]'s return values on failure. async fn process_sqs_response(&self, response: ReceiveMessageOutput) -> Result { let Some(messages) = response.messages else { @@ -183,6 +185,9 @@ impl, State: IngestionJobState + /// Extracts S3 object metadata from the given SQS record if it corresponds to a relevant /// object. /// + /// The object key in an S3 event notification is URL-encoded (e.g., spaces are encoded as `+`). + /// It is decoded before being matched against the listener's configuration and returned. + /// /// # Returns /// /// * `Some(ObjectMetadata)` if the record corresponds to a relevant object. @@ -190,16 +195,30 @@ impl, State: IngestionJobState + /// * The event is not an object creation event. /// * The bucket name does not match the listener's configuration. /// * [`Self::is_relevant_object`] evaluates to `false`. + /// + /// # Panics + /// + /// Panics if the decoded key is empty after URL decoding. This should be unreachable at runtime + /// since the decoding result of a non-empty string should always be a non-empty string. fn extract_object_metadata(&self, record: Record) -> Option { if !record.event_name.starts_with("ObjectCreated:") || self.config.get().base.bucket_name != record.s3.bucket.name.as_str() - || !self.is_relevant_object(record.s3.object.key.as_str()) { return None; } + + let (decoded_key, _) = + url::form_urlencoded::parse(record.s3.object.key.as_bytes()).next()?; + let key = + NonEmptyString::new(decoded_key.into_owned()).expect("decoded key must be non-empty"); + + if !self.is_relevant_object(key.as_str()) { + return None; + } + Some(ObjectMetadata { bucket: record.s3.bucket.name, - key: record.s3.object.key, + key, size: record.s3.object.size, }) } diff --git a/components/log-ingestor/tests/test_utils.rs b/components/log-ingestor/tests/test_utils.rs index 5b11443445..4ef11fa358 100644 --- a/components/log-ingestor/tests/test_utils.rs +++ b/components/log-ingestor/tests/test_utils.rs @@ -40,7 +40,15 @@ pub fn get_testing_prefix_as_non_empty_string(job_id: IngestionJobId) -> NonEmpt /// Uploads test S3 objects. /// -/// Objects are created with keys formatted as `{prefix}/{i}.log` where `i` is the object index. +/// Objects are created with keys formatted as `{prefix}/idx{i}{placeholder}` where: +/// +/// * `prefix` is the provided prefix. +/// * `i` is the object index. +/// * `placeholder` is one of the following to test URL encoding: +/// * `""` if `i` % 4 == 0. +/// * `&` if `i` % 4 == 1. +/// * `=` if `i` % 4 == 2. +/// * ` ` if `i` % 4 == 3. /// /// # Returns /// @@ -56,10 +64,19 @@ pub async fn upload_test_objects( num_objects_to_create: usize, ) -> Result> { let objects_to_create: Vec<_> = (0..num_objects_to_create) - .map(|idx| ObjectMetadata { - bucket: bucket.clone(), - key: NonEmptyString::from_string(format!("{prefix}/{idx:05}.log")), - size: 16, + .map(|idx| { + let placeholder = match idx % 4 { + 0 => "", + 1 => "&", + 2 => "=", + 3 => " ", + _ => unreachable!(), + }; + ObjectMetadata { + bucket: bucket.clone(), + key: NonEmptyString::from_string(format!("{prefix}/idx{idx:05}{placeholder}.log")), + size: 16, + } }) .collect();