From eefc3b9a5ce77b16e5f5971692704820660e7333 Mon Sep 17 00:00:00 2001 From: LinZhihao-723 Date: Fri, 29 May 2026 16:33:42 -0400 Subject: [PATCH 1/4] Fix. --- Cargo.lock | 1 + components/log-ingestor/Cargo.toml | 1 + .../src/ingestion_job/sqs_listener.rs | 23 ++++++++++++++-- .../log-ingestor/tests/test_ingestion_job.rs | 26 +++++++++++-------- components/log-ingestor/tests/test_utils.rs | 26 +++++++++++++++---- 5 files changed, 59 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index abe78a8ca7..233485a5d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2227,6 +2227,7 @@ dependencies = [ "tokio-util", "tower-http", "tracing", + "url", "utoipa", "utoipa-axum", "uuid", diff --git a/components/log-ingestor/Cargo.toml b/components/log-ingestor/Cargo.toml index 9636a527db..814ac6a82b 100644 --- a/components/log-ingestor/Cargo.toml +++ b/components/log-ingestor/Cargo.toml @@ -41,6 +41,7 @@ tokio-util = "0.7.18" tracing = "0.1.44" tower-http = { version = "0.6.8", features = ["cors"] } thiserror = "2.0.18" +url = "2.5.8" utoipa = { version = "5.4.0", features = ["axum_extras"] } utoipa-axum = "0.2.0" diff --git a/components/log-ingestor/src/ingestion_job/sqs_listener.rs b/components/log-ingestor/src/ingestion_job/sqs_listener.rs index 1a8749236c..659d543142 100644 --- a/components/log-ingestor/src/ingestion_job/sqs_listener.rs +++ b/components/log-ingestor/src/ingestion_job/sqs_listener.rs @@ -11,6 +11,7 @@ use clp_rust_utils::{ s3::ObjectMetadata, sqs::event::{Record, S3}, }; +use non_empty_string::NonEmptyString; use tokio::select; use tokio_util::sync::CancellationToken; @@ -102,6 +103,7 @@ impl, State: IngestionJobState + /// * Forwards the following `aws_sdk_sqs` methods' return values on failure: /// * [`DeleteMessageBatchFluentBuilder::send`] /// * [`DeleteMessageBatchRequestEntryBuilder::build`] + /// * Forwards [`Self::extract_object_metadata`]'s return values on failure. /// * Forwards [`SqsListenerState::ingest`]'s return values on failure. async fn process_sqs_response(&self, response: ReceiveMessageOutput) -> Result { let Some(messages) = response.messages else { @@ -183,6 +185,9 @@ impl, State: IngestionJobState + /// Extracts S3 object metadata from the given SQS record if it corresponds to a relevant /// object. /// + /// The object key in an S3 event notification is URL-encoded (e.g., spaces are encoded as `+`). + /// It is decoded before being matched against the listener's configuration and returned. + /// /// # Returns /// /// * `Some(ObjectMetadata)` if the record corresponds to a relevant object. @@ -190,16 +195,30 @@ impl, State: IngestionJobState + /// * The event is not an object creation event. /// * The bucket name does not match the listener's configuration. /// * [`Self::is_relevant_object`] evaluates to `false`. + /// + /// # Panics + /// + /// Panics if the decoded key is empty after URL decoding. This should be unreachable at runtime + /// since the decoding result of a non-empty string should always be a non-empty string. fn extract_object_metadata(&self, record: Record) -> Option { if !record.event_name.starts_with("ObjectCreated:") || self.config.get().base.bucket_name != record.s3.bucket.name.as_str() - || !self.is_relevant_object(record.s3.object.key.as_str()) { return None; } + + let (decoded_key, _) = + url::form_urlencoded::parse(record.s3.object.key.as_bytes()).next()?; + let key = + NonEmptyString::new(decoded_key.into_owned()).expect("decoded key must be non-empty"); + + if !self.is_relevant_object(key.as_str()) { + return None; + } + Some(ObjectMetadata { bucket: record.s3.bucket.name, - key: record.s3.object.key, + key, size: record.s3.object.size, }) } diff --git a/components/log-ingestor/tests/test_ingestion_job.rs b/components/log-ingestor/tests/test_ingestion_job.rs index 2d86ac4f8b..75acbac6d4 100644 --- a/components/log-ingestor/tests/test_ingestion_job.rs +++ b/components/log-ingestor/tests/test_ingestion_job.rs @@ -142,11 +142,16 @@ pub async fn upload_noise_objects( test_utils::create_s3_objects(s3_client, objects_to_create).await } -/// Waits until the shared buffer has at least `min_num_objects`. +/// Waits until at least `min_num_objects` distinct objects have been ingested. +/// +/// SQS provides at-least-once delivery, so the same object may be ingested more than once. The +/// ingested objects are therefore deduplicated before counting (so that redelivered messages don't +/// cause an incomplete set to be returned) and the returned vector is sorted and free of +/// duplicates. /// /// # Returns /// -/// A vector of ingested S3 object metadata on success. +/// A sorted, deduplicated vector of ingested S3 object metadata on success. async fn wait_for_ingested_objects( shared_buffer: Arc>>, min_num_objects: usize, @@ -155,11 +160,12 @@ async fn wait_for_ingested_objects( Duration::from_secs(WAIT_FOR_INGESTED_OBJECTS_TIMEOUT_SEC), async { loop { - let ingested_objects = shared_buffer.lock().await; + let mut ingested_objects = shared_buffer.lock().await.clone(); + ingested_objects.sort(); + ingested_objects.dedup(); if ingested_objects.len() >= min_num_objects { - return ingested_objects.clone(); + return ingested_objects; } - drop(ingested_objects); tokio::time::sleep(Duration::from_millis(INGESTED_OBJECT_POLL_INTERVAL_MS)).await; } }, @@ -224,13 +230,12 @@ async fn run_sqs_listener_test( let mut created_objects = upload_handle .await .context("Error while awaiting test object upload")??; - let mut received_objects = - wait_for_ingested_objects(shared_buffer, created_objects.len()).await; + let received_objects = wait_for_ingested_objects(shared_buffer, created_objects.len()).await; sqs_listener.shutdown_and_join().await; created_objects.sort(); - received_objects.sort(); + // `received_objects` is already sorted and deduplicated by `wait_for_ingested_objects`. assert_eq!(received_objects, created_objects); Ok(()) @@ -346,12 +351,11 @@ async fn test_s3_scanner() -> Result<()> { let mut created_objects = test_upload_handle .await .context("Error while awaiting test object upload")??; - let mut received_objects = - wait_for_ingested_objects(shared_buffer, created_objects.len()).await; + let received_objects = wait_for_ingested_objects(shared_buffer, created_objects.len()).await; s3_scanner.shutdown_and_join().await; created_objects.sort(); - received_objects.sort(); + // `received_objects` is already sorted and deduplicated by `wait_for_ingested_objects`. assert_eq!(received_objects, created_objects); Ok(()) diff --git a/components/log-ingestor/tests/test_utils.rs b/components/log-ingestor/tests/test_utils.rs index 5b11443445..74ce957adf 100644 --- a/components/log-ingestor/tests/test_utils.rs +++ b/components/log-ingestor/tests/test_utils.rs @@ -40,7 +40,14 @@ pub fn get_testing_prefix_as_non_empty_string(job_id: IngestionJobId) -> NonEmpt /// Uploads test S3 objects. /// -/// Objects are created with keys formatted as `{prefix}/{i}.log` where `i` is the object index. +/// Objects are created with keys formatted as `{prefix}/idx{i}{placeholder}` where: +/// * `prefix` is the provided prefix. +/// * `i` is the object index. +/// * `placeholder` is one of the following to test URL encoding: +/// * `""` if `i` % 4 == 0. +/// * `&` if `i` % 4 == 1. +/// * `=` if `i` % 4 == 2. +/// * ` ` if `i` % 4 == 3. /// /// # Returns /// @@ -56,10 +63,19 @@ pub async fn upload_test_objects( num_objects_to_create: usize, ) -> Result> { let objects_to_create: Vec<_> = (0..num_objects_to_create) - .map(|idx| ObjectMetadata { - bucket: bucket.clone(), - key: NonEmptyString::from_string(format!("{prefix}/{idx:05}.log")), - size: 16, + .map(|idx| { + let placeholder = match idx % 4 { + 0 => "", + 1 => "&", + 2 => "=", + 3 => " ", + _ => unreachable!(), + }; + ObjectMetadata { + bucket: bucket.clone(), + key: NonEmptyString::from_string(format!("{prefix}/idx{idx:05}{placeholder}.log")), + size: 16, + } }) .collect(); From 908119cfc71726d61e1faf2f23ca99824ac02c9c Mon Sep 17 00:00:00 2001 From: LinZhihao-723 Date: Fri, 29 May 2026 16:36:37 -0400 Subject: [PATCH 2/4] Revert test case changes. --- .../log-ingestor/tests/test_ingestion_job.rs | 40 +++++++++---------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/components/log-ingestor/tests/test_ingestion_job.rs b/components/log-ingestor/tests/test_ingestion_job.rs index 75acbac6d4..2dd651ffbb 100644 --- a/components/log-ingestor/tests/test_ingestion_job.rs +++ b/components/log-ingestor/tests/test_ingestion_job.rs @@ -142,16 +142,11 @@ pub async fn upload_noise_objects( test_utils::create_s3_objects(s3_client, objects_to_create).await } -/// Waits until at least `min_num_objects` distinct objects have been ingested. -/// -/// SQS provides at-least-once delivery, so the same object may be ingested more than once. The -/// ingested objects are therefore deduplicated before counting (so that redelivered messages don't -/// cause an incomplete set to be returned) and the returned vector is sorted and free of -/// duplicates. +/// Waits until the shared buffer has at least `min_num_objects`. /// /// # Returns /// -/// A sorted, deduplicated vector of ingested S3 object metadata on success. +/// A vector of ingested S3 object metadata on success. async fn wait_for_ingested_objects( shared_buffer: Arc>>, min_num_objects: usize, @@ -160,18 +155,17 @@ async fn wait_for_ingested_objects( Duration::from_secs(WAIT_FOR_INGESTED_OBJECTS_TIMEOUT_SEC), async { loop { - let mut ingested_objects = shared_buffer.lock().await.clone(); - ingested_objects.sort(); - ingested_objects.dedup(); + let ingested_objects = shared_buffer.lock().await; if ingested_objects.len() >= min_num_objects { - return ingested_objects; + return ingested_objects.clone(); } + drop(ingested_objects); tokio::time::sleep(Duration::from_millis(INGESTED_OBJECT_POLL_INTERVAL_MS)).await; } }, ) - .await - .expect("Timed out while waiting for ingested objects") + .await + .expect("Timed out while waiting for ingested objects") } /// Runs SQS listener test with the given job config. @@ -192,7 +186,7 @@ async fn run_sqs_listener_test( Some(&aws_config.endpoint), &aws_auth, ) - .await; + .await; let state = SqsListenerTestState::new(); let shared_buffer = state.get_shared_buffer(); @@ -210,7 +204,7 @@ async fn run_sqs_listener_test( Some(&aws_config.endpoint), &aws_auth, ) - .await; + .await; let upload_handle = tokio::spawn(upload_test_objects( s3_client.clone(), @@ -230,12 +224,13 @@ async fn run_sqs_listener_test( let mut created_objects = upload_handle .await .context("Error while awaiting test object upload")??; - let received_objects = wait_for_ingested_objects(shared_buffer, created_objects.len()).await; + let mut received_objects = + wait_for_ingested_objects(shared_buffer, created_objects.len()).await; sqs_listener.shutdown_and_join().await; created_objects.sort(); - // `received_objects` is already sorted and deduplicated by `wait_for_ingested_objects`. + received_objects.sort(); assert_eq!(received_objects, created_objects); Ok(()) @@ -279,8 +274,8 @@ async fn test_sqs_listener() -> Result<()> { buffer_config: BufferConfig::default(), }, ) - .await - .unwrap_or_else(|_| panic!("SQS listener test failed. Num tasks: {num_tasks}")); + .await + .unwrap_or_else(|_| panic!("SQS listener test failed. Num tasks: {num_tasks}")); } Ok(()) @@ -306,7 +301,7 @@ async fn test_s3_scanner() -> Result<()> { Some(&aws_config.endpoint), &aws_auth, ) - .await; + .await; let s3_scanner_config = S3ScannerConfig { base: BaseConfig { @@ -351,11 +346,12 @@ async fn test_s3_scanner() -> Result<()> { let mut created_objects = test_upload_handle .await .context("Error while awaiting test object upload")??; - let received_objects = wait_for_ingested_objects(shared_buffer, created_objects.len()).await; + let mut received_objects = + wait_for_ingested_objects(shared_buffer, created_objects.len()).await; s3_scanner.shutdown_and_join().await; created_objects.sort(); - // `received_objects` is already sorted and deduplicated by `wait_for_ingested_objects`. + received_objects.sort(); assert_eq!(received_objects, created_objects); Ok(()) From 34d40b015e90ee66422da023af42c4af6d4c1b56 Mon Sep 17 00:00:00 2001 From: LinZhihao-723 Date: Fri, 29 May 2026 16:37:46 -0400 Subject: [PATCH 3/4] Done. --- .../log-ingestor/tests/test_ingestion_job.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/components/log-ingestor/tests/test_ingestion_job.rs b/components/log-ingestor/tests/test_ingestion_job.rs index 2dd651ffbb..2d86ac4f8b 100644 --- a/components/log-ingestor/tests/test_ingestion_job.rs +++ b/components/log-ingestor/tests/test_ingestion_job.rs @@ -164,8 +164,8 @@ async fn wait_for_ingested_objects( } }, ) - .await - .expect("Timed out while waiting for ingested objects") + .await + .expect("Timed out while waiting for ingested objects") } /// Runs SQS listener test with the given job config. @@ -186,7 +186,7 @@ async fn run_sqs_listener_test( Some(&aws_config.endpoint), &aws_auth, ) - .await; + .await; let state = SqsListenerTestState::new(); let shared_buffer = state.get_shared_buffer(); @@ -204,7 +204,7 @@ async fn run_sqs_listener_test( Some(&aws_config.endpoint), &aws_auth, ) - .await; + .await; let upload_handle = tokio::spawn(upload_test_objects( s3_client.clone(), @@ -274,8 +274,8 @@ async fn test_sqs_listener() -> Result<()> { buffer_config: BufferConfig::default(), }, ) - .await - .unwrap_or_else(|_| panic!("SQS listener test failed. Num tasks: {num_tasks}")); + .await + .unwrap_or_else(|_| panic!("SQS listener test failed. Num tasks: {num_tasks}")); } Ok(()) @@ -301,7 +301,7 @@ async fn test_s3_scanner() -> Result<()> { Some(&aws_config.endpoint), &aws_auth, ) - .await; + .await; let s3_scanner_config = S3ScannerConfig { base: BaseConfig { From 8f13e92117dffffb2fbf3cf35154eeef5e74f5a9 Mon Sep 17 00:00:00 2001 From: LinZhihao-723 Date: Wed, 3 Jun 2026 18:04:36 -0400 Subject: [PATCH 4/4] Fix docstring, --- components/log-ingestor/tests/test_utils.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/components/log-ingestor/tests/test_utils.rs b/components/log-ingestor/tests/test_utils.rs index 74ce957adf..4ef11fa358 100644 --- a/components/log-ingestor/tests/test_utils.rs +++ b/components/log-ingestor/tests/test_utils.rs @@ -41,6 +41,7 @@ pub fn get_testing_prefix_as_non_empty_string(job_id: IngestionJobId) -> NonEmpt /// Uploads test S3 objects. /// /// Objects are created with keys formatted as `{prefix}/idx{i}{placeholder}` where: +/// /// * `prefix` is the provided prefix. /// * `i` is the object index. /// * `placeholder` is one of the following to test URL encoding: