From 878c85ca475310615ca7d887f0b66598e7b044eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Cohen?= Date: Wed, 18 Feb 2026 16:24:32 +0100 Subject: [PATCH 1/3] feat: Make elastic TermsQuery use TermSetQuery internally (#6151) * feat: Make elastic TermsQuery use TermSetQuery internally Signed-off-by: Darkheir * Support non-indexed fast fields in TermSetQuery Signed-off-by: Darkheir --------- Signed-off-by: Darkheir --- .../src/elastic_query_dsl/term_query.rs | 1 + .../src/elastic_query_dsl/terms_query.rs | 23 ++-- .../src/query_ast/term_set_query.rs | 121 +++++++++++++++++- 3 files changed, 129 insertions(+), 16 deletions(-) diff --git a/quickwit/quickwit-query/src/elastic_query_dsl/term_query.rs b/quickwit/quickwit-query/src/elastic_query_dsl/term_query.rs index d7d1ea72d92..5fd320a2580 100644 --- a/quickwit/quickwit-query/src/elastic_query_dsl/term_query.rs +++ b/quickwit/quickwit-query/src/elastic_query_dsl/term_query.rs @@ -75,6 +75,7 @@ pub struct TermQueryParams { case_insensitive: bool, } +#[cfg(test)] pub fn term_query_from_field_value(field: impl ToString, value: impl ToString) -> TermQuery { TermQuery { field: field.to_string(), diff --git a/quickwit/quickwit-query/src/elastic_query_dsl/terms_query.rs b/quickwit/quickwit-query/src/elastic_query_dsl/terms_query.rs index 479b21df82c..48b65e4dd29 100644 --- a/quickwit/quickwit-query/src/elastic_query_dsl/terms_query.rs +++ b/quickwit/quickwit-query/src/elastic_query_dsl/terms_query.rs @@ -12,14 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::{BTreeSet, HashMap}; + use serde::Deserialize; -use crate::elastic_query_dsl::bool_query::BoolQuery; use crate::elastic_query_dsl::one_field_map::OneFieldMap; -use crate::elastic_query_dsl::term_query::term_query_from_field_value; use crate::elastic_query_dsl::{ConvertibleToQueryAst, ElasticQueryDslInner}; use crate::not_nan_f32::NotNaNf32; -use crate::query_ast::QueryAst; +use crate::query_ast::{QueryAst, TermSetQuery}; #[derive(PartialEq, Eq, Debug, Deserialize, Clone)] #[serde(try_from = "TermsQueryForSerialization")] @@ -87,15 +87,14 @@ impl TryFrom for TermsQuery { impl ConvertibleToQueryAst for TermsQuery { fn convert_to_query_ast(self) -> anyhow::Result { - let term_queries: Vec = self - .values - .into_iter() - .map(|value| term_query_from_field_value(self.field.clone(), value)) - .map(ElasticQueryDslInner::from) - .collect(); - let mut union = BoolQuery::union(term_queries); - union.boost = self.boost; - union.convert_to_query_ast() + let mut terms_per_field = HashMap::new(); + let values_set: BTreeSet = self.values.into_iter().collect(); + terms_per_field.insert(self.field, values_set); + + let term_set_query = TermSetQuery { terms_per_field }; + let query_ast: QueryAst = term_set_query.into(); + + Ok(query_ast.boost(self.boost)) } } diff --git a/quickwit/quickwit-query/src/query_ast/term_set_query.rs b/quickwit/quickwit-query/src/query_ast/term_set_query.rs index d25d01a4703..0605d854863 100644 --- a/quickwit/quickwit-query/src/query_ast/term_set_query.rs +++ b/quickwit/quickwit-query/src/query_ast/term_set_query.rs @@ -19,7 +19,7 @@ use tantivy::Term; use crate::InvalidQuery; use crate::query_ast::{ - BuildTantivyAst, BuildTantivyAstContext, QueryAst, TantivyQueryAst, TermQuery, + BoolQuery, BuildTantivyAst, BuildTantivyAstContext, QueryAst, TantivyQueryAst, TermQuery, }; /// TermSetQuery matches the same document set as if it was a union of @@ -32,6 +32,53 @@ pub struct TermSetQuery { } impl TermSetQuery { + fn has_fast_only_field(&self, context: &BuildTantivyAstContext) -> bool { + for full_path in self.terms_per_field.keys() { + if let Some((_, field_entry, _)) = + super::utils::find_field_or_hit_dynamic(full_path, context.schema) + && field_entry.is_fast() + && !field_entry.is_indexed() + { + return true; + } + } + false + } + + fn build_bool_query( + &self, + context: &BuildTantivyAstContext, + ) -> Result { + let should_clauses = self + .terms_per_field + .iter() + .flat_map(|(full_path, values)| { + values.iter().map(|value| { + QueryAst::Term(TermQuery { + field: full_path.to_string(), + value: value.to_string(), + }) + }) + }) + .collect(); + + let bool_query = BoolQuery { + should: should_clauses, + ..Default::default() + }; + + bool_query.build_tantivy_ast_impl(context) + } + + fn build_term_set_query( + &self, + context: &BuildTantivyAstContext, + ) -> Result { + let terms_it = self.make_term_iterator(context)?; + let term_set_query = tantivy::query::TermSetQuery::new(terms_it); + Ok(term_set_query.into()) + } + fn make_term_iterator( &self, context: &BuildTantivyAstContext, @@ -67,9 +114,11 @@ impl BuildTantivyAst for TermSetQuery { &self, context: &BuildTantivyAstContext, ) -> Result { - let terms_it = self.make_term_iterator(context)?; - let term_set_query = tantivy::query::TermSetQuery::new(terms_it); - Ok(term_set_query.into()) + if self.has_fast_only_field(context) { + self.build_bool_query(context) + } else { + self.build_term_set_query(context) + } } } @@ -78,3 +127,67 @@ impl From for QueryAst { QueryAst::TermSet(term_set_query) } } + +#[cfg(test)] +mod tests { + use std::collections::{BTreeSet, HashMap}; + + use tantivy::schema::{FAST, INDEXED, Schema}; + + use super::TermSetQuery; + use crate::query_ast::{BuildTantivyAst, BuildTantivyAstContext}; + + #[test] + fn test_term_set_query_with_fast_only_field_returns_bool_query() { + let mut schema_builder = Schema::builder(); + schema_builder.add_u64_field("fast_field", FAST); + let schema = schema_builder.build(); + + let terms_per_field = HashMap::from([( + "fast_field".to_string(), + BTreeSet::from(["1".to_string(), "2".to_string()]), + )]); + let term_set_query = TermSetQuery { terms_per_field }; + + let tantivy_query_ast = term_set_query + .build_tantivy_ast_call(&BuildTantivyAstContext::for_test(&schema)) + .unwrap(); + + let bool_query = tantivy_query_ast + .as_bool_query() + .expect("Expected BoolQuery for fast-only field, but got a different query type"); + assert_eq!(bool_query.should.len(), 2); + assert_eq!(bool_query.must.len(), 0); + assert_eq!(bool_query.must_not.len(), 0); + assert_eq!(bool_query.filter.len(), 0); + } + + #[test] + fn test_term_set_query_with_indexed_field_uses_term_set() { + let mut schema_builder = Schema::builder(); + schema_builder.add_u64_field("indexed_field", FAST | INDEXED); + let schema = schema_builder.build(); + + let terms_per_field = HashMap::from([( + "indexed_field".to_string(), + BTreeSet::from(["1".to_string(), "2".to_string()]), + )]); + let term_set_query = TermSetQuery { terms_per_field }; + + let tantivy_query_ast = term_set_query + .build_tantivy_ast_call(&BuildTantivyAstContext::for_test(&schema)) + .unwrap(); + + // Should return a leaf query (TermSetQuery wrapped in TantivyQueryAst) + let leaf = tantivy_query_ast + .as_leaf() + .expect("Expected a leaf query (TermSetQuery), but got a complex query"); + + // Verify it's a TermSetQuery by checking the debug representation + let debug_str = format!("{leaf:?}"); + assert!( + debug_str.contains("TermSetQuery"), + "Expected TermSetQuery, got: {debug_str}" + ); + } +} From 3e0670a3028e209105d2da18f360fd1592939be7 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 11 Feb 2026 10:52:14 +0100 Subject: [PATCH 2/3] Add SSE-C encryption config --- .github/workflows/ci.yml | 8 +- .github/workflows/ui-ci.yml | 1 + quickwit/quickwit-config/src/lib.rs | 3 +- .../quickwit-config/src/storage_config.rs | 59 ++ .../object_storage/s3_compatible_storage.rs | 745 +++++++++++++++++- 5 files changed, 800 insertions(+), 16 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5ea79141fe6..7af5fbda950 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -64,7 +64,9 @@ jobs: - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 - name: Install Ubuntu packages - run: sudo apt-get -y install protobuf-compiler + run: | + sudo apt-get update + sudo apt-get -y install protobuf-compiler - uses: actions/setup-python@83679a892e2d95755f2dac6acb0bfd1e9ac5d548 # v.6.1.0 with: python-version: '3.11' @@ -132,7 +134,9 @@ jobs: - .github/workflows/ci.yml - name: Install Ubuntu packages if: always() && steps.modified.outputs.rust_src == 'true' - run: sudo apt-get -y install protobuf-compiler + run: | + sudo apt-get update + sudo apt-get -y install protobuf-compiler - name: Setup nightly Rust Toolchain (for rustfmt) if: steps.modified.outputs.rust_src == 'true' uses: dtolnay/rust-toolchain@f7ccc83f9ed1e5b9c81d8a67d7ad1a747e22a561 # master diff --git a/.github/workflows/ui-ci.yml b/.github/workflows/ui-ci.yml index 1d1a4f93b1e..f0d0e86c6a9 100644 --- a/.github/workflows/ui-ci.yml +++ b/.github/workflows/ui-ci.yml @@ -30,6 +30,7 @@ jobs: task: - name: Cypress run command: | + sudo apt-get update sudo apt-get -y install protobuf-compiler CI=false yarn --cwd quickwit-ui build RUSTFLAGS="--cfg tokio_unstable" cargo build --features=postgres diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index 04771add011..b1d675fc1f6 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -80,7 +80,8 @@ pub use crate::node_config::{ use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig}; pub use crate::storage_config::{ AzureStorageConfig, FileStorageConfig, GoogleCloudStorageConfig, RamStorageConfig, - S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, StorageConfigs, + S3EncryptionConfig, S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, + StorageConfigs, }; /// Returns true if the ingest API v2 is enabled. diff --git a/quickwit/quickwit-config/src/storage_config.rs b/quickwit/quickwit-config/src/storage_config.rs index 3547a33e483..52daffdb537 100644 --- a/quickwit/quickwit-config/src/storage_config.rs +++ b/quickwit/quickwit-config/src/storage_config.rs @@ -308,6 +308,36 @@ impl fmt::Debug for AzureStorageConfig { } } +#[derive(Clone, Eq, PartialEq, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum S3EncryptionConfig { + /// This is the standard AES256 SSE-C header config. Key is expected to be a + /// 256bit base64-encoded string, and key_md5 is expected to be the + /// base64-encoded MD5 digest of the (binary) key. Akamai gen1 buckets don't + /// respect this (only the a 32 hex char key is expected). + SseC { + key: String, + key_md5: String, + read_only: bool, + }, +} + +impl fmt::Debug for S3EncryptionConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + S3EncryptionConfig::SseC { + key_md5, read_only, .. + } => f + .debug_struct("S3EncryptionConfig") + .field("type", &"sse_c") + .field("key", &"***redacted***") + .field("key_md5", key_md5) + .field("read_only", read_only) + .finish(), + } + } +} + #[derive(Clone, Default, Eq, PartialEq, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct S3StorageConfig { @@ -329,6 +359,8 @@ pub struct S3StorageConfig { pub disable_multi_object_delete: bool, #[serde(default)] pub disable_multipart_upload: bool, + #[serde(default)] + pub encryption: Option, } impl S3StorageConfig { @@ -685,4 +717,31 @@ mod tests { assert_eq!(s3_storage_config.flavor, Some(StorageBackendFlavor::MinIO)); } } + + #[test] + fn test_storage_s3_config_encryption_serde() { + { + let s3_storage_config_yaml = r#" + endpoint: http://localhost:4566 + encryption: + type: sse_c + key: test-customer-key + key_md5: test-customer-key-md5 + read_only: true + "#; + let s3_storage_config: S3StorageConfig = + serde_yaml::from_str(s3_storage_config_yaml).unwrap(); + + let expected_s3_config = S3StorageConfig { + endpoint: Some("http://localhost:4566".to_string()), + encryption: Some(S3EncryptionConfig::SseC { + key: "test-customer-key".to_string(), + key_md5: "test-customer-key-md5".to_string(), + read_only: true, + }), + ..Default::default() + }; + assert_eq!(s3_storage_config, expected_s3_config); + } + } } diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 1401a48d998..ecce3c795da 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -38,7 +38,7 @@ use quickwit_aws::{aws_behavior_version, get_aws_config}; use quickwit_common::retry::{Retry, RetryParams}; use quickwit_common::uri::Uri; use quickwit_common::{chunk_range, into_u64_range}; -use quickwit_config::S3StorageConfig; +use quickwit_config::{S3EncryptionConfig, S3StorageConfig}; use regex::Regex; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader, ReadBuf}; use tokio::sync::Semaphore; @@ -89,6 +89,7 @@ pub struct S3CompatibleObjectStorage { retry_params: RetryParams, disable_multi_object_delete: bool, disable_multipart_upload: bool, + encryption: Option, } impl fmt::Debug for S3CompatibleObjectStorage { @@ -97,6 +98,7 @@ impl fmt::Debug for S3CompatibleObjectStorage { .debug_struct("S3CompatibleObjectStorage") .field("bucket", &self.bucket) .field("prefix", &self.prefix) + .field("encryption", &self.encryption) .finish() } } @@ -184,6 +186,7 @@ impl S3CompatibleObjectStorage { retry_params, disable_multi_object_delete, disable_multipart_upload, + encryption: s3_storage_config.encryption.clone(), }) } @@ -201,6 +204,7 @@ impl S3CompatibleObjectStorage { retry_params: self.retry_params, disable_multi_object_delete: self.disable_multi_object_delete, disable_multipart_upload: self.disable_multipart_upload, + encryption: self.encryption, } } @@ -289,12 +293,30 @@ impl S3CompatibleObjectStorage { .await .map_err(|io_error| Retry::Permanent(StorageError::from(io_error)))?; - self.s3_client + let mut req_builder = self + .s3_client .put_object() .bucket(bucket) .key(key) .body(body) - .content_length(len as i64) + .content_length(len as i64); + match &self.encryption { + Some(S3EncryptionConfig::SseC { + key, + key_md5, + read_only: false, + }) => { + req_builder = req_builder + .set_sse_customer_algorithm(Some("AES256".to_string())) + .set_sse_customer_key(Some(key.clone())) + .set_sse_customer_key_md5(Some(key_md5.clone())); + } + Some(S3EncryptionConfig::SseC { + read_only: true, .. + }) => {} + None => {} + } + req_builder .send() .with_count_and_upload_metrics(ActionLabel::PutObject, len) .await @@ -326,10 +348,28 @@ impl S3CompatibleObjectStorage { async fn create_multipart_upload(&self, key: &str) -> StorageResult { let upload_id = aws_retry(&self.retry_params, || async { - self.s3_client + let mut req_builder = self + .s3_client .create_multipart_upload() .bucket(self.bucket.clone()) - .key(key) + .key(key); + match &self.encryption { + Some(S3EncryptionConfig::SseC { + key, + key_md5, + read_only: false, + }) => { + req_builder = req_builder + .set_sse_customer_algorithm(Some("AES256".to_string())) + .set_sse_customer_key(Some(key.clone())) + .set_sse_customer_key_md5(Some(key_md5.clone())); + } + Some(S3EncryptionConfig::SseC { + read_only: true, .. + }) => {} + None => {} + } + req_builder .send() .with_count_metric(ActionLabel::CreateMultipartUpload) .await @@ -421,7 +461,7 @@ impl S3CompatibleObjectStorage { .map_err(Retry::Permanent)?; let md5 = BASE64_STANDARD.encode(part.md5.0); - let upload_part_output = self + let mut req_builder = self .s3_client .upload_part() .bucket(self.bucket.clone()) @@ -430,7 +470,24 @@ impl S3CompatibleObjectStorage { .content_length(part.len() as i64) .content_md5(md5) .part_number(part.part_number as i32) - .upload_id(upload_id.0) + .upload_id(upload_id.0); + match &self.encryption { + Some(S3EncryptionConfig::SseC { + key, + key_md5, + read_only: false, + }) => { + req_builder = req_builder + .set_sse_customer_algorithm(Some("AES256".to_string())) + .set_sse_customer_key(Some(key.clone())) + .set_sse_customer_key_md5(Some(key_md5.clone())); + } + Some(S3EncryptionConfig::SseC { + read_only: true, .. + }) => {} + None => {} + } + let upload_part_output = req_builder .send() .with_count_and_upload_metrics(ActionLabel::UploadPart, part.len()) .await @@ -542,12 +599,26 @@ impl S3CompatibleObjectStorage { let key = self.key(path); let range_str = range_opt.map(|range| format!("bytes={}-{}", range.start, range.end - 1)); - let get_object_output = self + let mut req_builder = self .s3_client .get_object() .bucket(self.bucket.clone()) .key(key) - .set_range(range_str) + .set_range(range_str); + match &self.encryption { + Some(S3EncryptionConfig::SseC { + key, + key_md5, + read_only: _, + }) => { + req_builder = req_builder + .set_sse_customer_algorithm(Some("AES256".to_string())) + .set_sse_customer_key(Some(key.clone())) + .set_sse_customer_key_md5(Some(key_md5.clone())); + } + None => {} + } + let get_object_output = req_builder .send() .with_count_and_duration_metrics(ActionLabel::GetObject) .await?; @@ -843,10 +914,21 @@ impl Storage for S3CompatibleObjectStorage { let bucket = self.bucket.clone(); let key = self.key(path); let head_object_output = aws_retry(&self.retry_params, || async { - self.s3_client - .head_object() - .bucket(&bucket) - .key(&key) + let mut req_builder = self.s3_client.head_object().bucket(&bucket).key(&key); + match &self.encryption { + Some(S3EncryptionConfig::SseC { + key, + key_md5, + read_only: _, + }) => { + req_builder = req_builder + .set_sse_customer_algorithm(Some("AES256".to_string())) + .set_sse_customer_key(Some(key.clone())) + .set_sse_customer_key_md5(Some(key_md5.clone())); + } + None => {} + } + req_builder .send() .with_count_metric(ActionLabel::HeadObject) .await @@ -948,6 +1030,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + encryption: None, }; assert_eq!( s3_storage.relative_path("indexes/foo"), @@ -995,6 +1078,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: true, disable_multipart_upload: false, + encryption: None, }; let _ = s3_storage .bulk_delete(&[Path::new("foo"), Path::new("bar")]) @@ -1032,6 +1116,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + encryption: None, }; let _ = s3_storage .bulk_delete(&[Path::new("foo"), Path::new("bar")]) @@ -1114,6 +1199,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + encryption: None, }; let bulk_delete_error = s3_storage .bulk_delete(&[ @@ -1205,10 +1291,643 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + encryption: None, }; s3_storage .put(Path::new("my-path"), Box::new(vec![1, 2, 3])) .await .unwrap(); } + + #[tokio::test] + async fn test_sse_c_headers_in_regular_put() { + for read_only in [false, true] { + let client = StaticReplayClient::new(vec![ReplayEvent::new( + http::Request::builder().body(SdkBody::empty()).unwrap(), + http::Response::builder() + .status(200) + .body(SdkBody::empty()) + .unwrap(), + )]); + + let credentials = Credentials::new("mock_key", "mock_secret", None, None, "mock"); + let config = aws_sdk_s3::Config::builder() + .behavior_version(aws_behavior_version()) + .region(Some(Region::new("Foo"))) + .http_client(client.clone()) + .credentials_provider(credentials) + .build(); + + let uri = Uri::for_test("s3://test-bucket/prefix"); + + let s3_client = S3Client::from_conf(config.clone()); + let s3_storage = S3CompatibleObjectStorage { + s3_client, + uri: uri.clone(), + bucket: "test-bucket".to_string(), + prefix: PathBuf::from("prefix"), + multipart_policy: MultiPartPolicy::default(), + retry_params: RetryParams::for_test(), + disable_multi_object_delete: false, + disable_multipart_upload: false, + encryption: Some(S3EncryptionConfig::SseC { + key: "dGVzdGtleWZvcmVuY3J5cHRpb24xMjM0NTY3OA==".to_string(), + key_md5: "SomeBase64MD5Value=".to_string(), + read_only, + }), + }; + + let small_payload = vec![1u8; 100]; + let _ = s3_storage + .put(Path::new("small-file"), Box::new(small_payload)) + .await; + + let requests = client.actual_requests().collect::>(); + assert_eq!(requests.len(), 1, "Expected exactly 1 PutObject requests"); + + let headers = requests[0].headers(); + if !read_only { + assert!(headers.contains_key("x-amz-server-side-encryption-customer-algorithm")); + assert!(headers.contains_key("x-amz-server-side-encryption-customer-key")); + assert!(headers.contains_key("x-amz-server-side-encryption-customer-key-md5")); + } + if read_only { + assert!(!headers.contains_key("x-amz-server-side-encryption-customer-algorithm")); + assert!(!headers.contains_key("x-amz-server-side-encryption-customer-key")); + assert!(!headers.contains_key("x-amz-server-side-encryption-customer-key-md5")); + } + } + } + + #[tokio::test] + async fn test_sse_c_headers_in_multipart_upload() { + for read_only in [false, true] { + let client = StaticReplayClient::new(vec![ + ReplayEvent::new( + http::Request::builder().body(SdkBody::empty()).unwrap(), + http::Response::builder() + .status(200) + .body(SdkBody::from( + r#" + + test-bucket + large-file + test-upload-id + "#, + )) + .unwrap(), + ), + ReplayEvent::new( + http::Request::builder().body(SdkBody::empty()).unwrap(), + http::Response::builder() + .status(200) + .header("ETag", "\"etag1\"") + .body(SdkBody::empty()) + .unwrap(), + ), + ReplayEvent::new( + http::Request::builder().body(SdkBody::empty()).unwrap(), + http::Response::builder() + .status(200) + .header("ETag", "\"etag2\"") + .body(SdkBody::empty()) + .unwrap(), + ), + ReplayEvent::new( + http::Request::builder().body(SdkBody::empty()).unwrap(), + http::Response::builder() + .status(200) + .body(SdkBody::empty()) + .unwrap(), + ), + ]); + + let credentials = Credentials::new("mock_key", "mock_secret", None, None, "mock"); + let config = aws_sdk_s3::Config::builder() + .behavior_version(aws_behavior_version()) + .region(Some(Region::new("Foo"))) + .http_client(client.clone()) + .credentials_provider(credentials) + .build(); + + let s3_client = S3Client::from_conf(config); + let uri = Uri::for_test("s3://test-bucket/prefix"); + + // Use a custom multipart policy with a low threshold to trigger multipart + let multipart_policy = MultiPartPolicy { + target_part_num_bytes: 5 * 1024 * 1024, // 5MB parts + multipart_threshold_num_bytes: 10 * 1024 * 1024, // 10MB threshold + max_num_parts: 10_000, + max_object_num_bytes: 5_000_000_000_000u64, + max_concurrent_uploads: 100, + }; + + let s3_storage = S3CompatibleObjectStorage { + s3_client, + uri, + bucket: "test-bucket".to_string(), + prefix: PathBuf::from("prefix"), + multipart_policy, + retry_params: RetryParams::for_test(), + disable_multi_object_delete: false, + disable_multipart_upload: false, + encryption: Some(S3EncryptionConfig::SseC { + key: "dGVzdGtleWZvcmVuY3J5cHRpb24xMjM0NTY3OA==".to_string(), + key_md5: "SomeBase64MD5Value=".to_string(), + read_only, + }), + }; + + // Test multipart upload with large payload that triggers multipart (15MB > 10MB + // threshold) + let large_payload = vec![2u8; 15 * 1024 * 1024]; // 15MB to trigger multipart + let _ = s3_storage + .put(Path::new("large-file"), Box::new(large_payload)) + .await; + + // Verify captured requests have SSE-C headers + let requests = client.actual_requests().collect::>(); + + // Should have: CreateMultipartUpload + N UploadParts + CompleteMultipartUpload + assert!( + requests.len() >= 3, + "Expected at least 3 requests got {}", + requests.len() + ); + + // Check CreateMultipartUpload and UploadPart requests for SSE-C headers + // CompleteMultipartUpload does not require SSE-C headers + for (i, request) in requests[..requests.len() - 1].iter().enumerate() { + let headers = request.headers(); + if read_only { + assert!( + !headers.contains_key("x-amz-server-side-encryption-customer-algorithm"), + "request {i} failed" + ); + assert!( + !headers.contains_key("x-amz-server-side-encryption-customer-key"), + "request {i} failed" + ); + assert!( + !headers.contains_key("x-amz-server-side-encryption-customer-key-md5"), + "request {i} failed" + ); + } + if !read_only { + assert!( + headers.contains_key("x-amz-server-side-encryption-customer-algorithm"), + "request {i} failed" + ); + assert!( + headers.contains_key("x-amz-server-side-encryption-customer-key"), + "request {i} failed" + ); + assert!( + headers.contains_key("x-amz-server-side-encryption-customer-key-md5"), + "request {i} failed" + ); + } + } + } + } +} + +/// These tests serve as a playground to test how S3 providers react to +/// encryption headers. They require valid credentials to be configured as +/// environment variables. They are ignored by default and need to be run +/// ad-hoc. +#[cfg(test)] +mod provider_tests { + use std::path::Path; + + use aws_sdk_s3::types::ServerSideEncryption; + use quickwit_common::uri::Uri; + use quickwit_config::{S3EncryptionConfig, S3StorageConfig}; + + use crate::{MultiPartPolicy, S3CompatibleObjectStorage, Storage}; + + /// Checks that a file was encrypted with a managed encryption (SSE-S3) + /// + /// Does not work with SSE-C (meta headers are not the same) + async fn assert_auto_encrypted(storage: &S3CompatibleObjectStorage, path: &Path) { + let meta = storage + .s3_client + .head_object() + .bucket(&storage.bucket) + .key(storage.key(path)) + .send() + .await + .unwrap(); + + assert_eq!( + meta.server_side_encryption(), + Some(&ServerSideEncryption::Aes256) + ); + } + + #[tokio::test] + #[ignore] + async fn test_akamai_s3_encryption() { + let access_key_id = + quickwit_common::get_from_env_opt("TEST_AKAMAI_S3_ACCESS_KEY_ID", false); + let secret_access_key = + quickwit_common::get_from_env_opt("TEST_AKAMAI_S3_SECRET_ACCESS_KEY", true); + let endpoint = quickwit_common::get_from_env_opt("TEST_AKAMAI_S3_ENDPOINT_URL", false); + let bucket = "s3://remi-encryption-tests"; + + let config_enc = S3StorageConfig { + access_key_id: access_key_id.clone(), + secret_access_key: secret_access_key.clone(), + endpoint: endpoint.clone(), + region: Some("akamai".to_string()), + encryption: Some(S3EncryptionConfig::SseC { + key: "Ia6VCtDyKCRi3N88na+m7pgiMNeicLVq70Swq1fdDOU=".to_string(), + key_md5: "viLkS9avbUl3bNFYDdAJRQ==".to_string(), + read_only: false, + }), + + ..Default::default() + }; + let storage_enc = S3CompatibleObjectStorage::from_uri(&config_enc, &Uri::for_test(bucket)) + .await + .unwrap(); + + let config_plain = S3StorageConfig { + access_key_id: access_key_id.clone(), + secret_access_key: secret_access_key.clone(), + endpoint: endpoint.clone(), + region: Some("akamai".to_string()), + encryption: None, + ..Default::default() + }; + let storage_plain = + S3CompatibleObjectStorage::from_uri(&config_plain, &Uri::for_test(bucket)) + .await + .unwrap(); + + storage_enc + .put( + Path::new("hello_enc"), + Box::new("Test content".as_bytes().to_vec()), + ) + .await + .unwrap(); + + storage_plain + .put( + Path::new("hello_plain"), + Box::new("Test content".as_bytes().to_vec()), + ) + .await + .unwrap(); + + // Nominal SSE-C path (write and read with the key) + let enc_obj_enc_read = storage_enc.get_all(Path::new("hello_enc")).await.unwrap(); + assert_eq!(enc_obj_enc_read, "Test content".as_bytes()); + let num_bytes = storage_enc + .file_num_bytes(Path::new("hello_enc")) + .await + .unwrap(); + assert_eq!(num_bytes, "Test content".len() as u64); + + // On Akamai, you can read plain objects with the encryption headers + let plain_obj_enc_read = storage_enc.get_all(Path::new("hello_plain")).await.unwrap(); + assert_eq!(plain_obj_enc_read, "Test content".as_bytes()); + let num_bytes = storage_enc + .file_num_bytes(Path::new("hello_plain")) + .await + .unwrap(); + assert_eq!(num_bytes, "Test content".len() as u64); + + // Nominal plain path (write and read without the key) + // no SSE-S3 on Akamai, so no auto-encryption + let plain_obj_plain_read = storage_plain + .get_all(Path::new("hello_plain")) + .await + .unwrap(); + assert_eq!(plain_obj_plain_read, "Test content".as_bytes()); + + // Encrypted objects should not be readable without the key + storage_plain + .get_all(Path::new("hello_enc")) + .await + .unwrap_err(); + + storage_enc.delete(Path::new("hello_enc")).await.unwrap(); + storage_enc.delete(Path::new("hello_plain")).await.unwrap(); + } + + #[tokio::test] + #[ignore] + async fn test_akamai_s3_sse_c_multipart() { + let access_key_id = + quickwit_common::get_from_env_opt("TEST_AKAMAI_S3_ACCESS_KEY_ID", false); + let secret_access_key = + quickwit_common::get_from_env_opt("TEST_AKAMAI_S3_SECRET_ACCESS_KEY", true); + let endpoint = quickwit_common::get_from_env_opt("TEST_AKAMAI_S3_ENDPOINT_URL", false); + let bucket = "s3://remi-encryption-tests"; + + let multipart_policy = MultiPartPolicy { + target_part_num_bytes: 5 * 1024 * 1024, + multipart_threshold_num_bytes: 10 * 1024 * 1024, + max_num_parts: 10_000, + max_object_num_bytes: 5_000_000_000_000u64, + max_concurrent_uploads: 100, + }; + let config_enc = S3StorageConfig { + access_key_id: access_key_id.clone(), + secret_access_key: secret_access_key.clone(), + endpoint: endpoint.clone(), + region: Some("akamai".to_string()), + encryption: Some(S3EncryptionConfig::SseC { + key: "Ia6VCtDyKCRi3N88na+m7pgiMNeicLVq70Swq1fdDOU=".to_string(), + key_md5: "viLkS9avbUl3bNFYDdAJRQ==".to_string(), + read_only: false, + }), + ..Default::default() + }; + let mut storage_enc = + S3CompatibleObjectStorage::from_uri(&config_enc, &Uri::for_test(bucket)) + .await + .unwrap(); + storage_enc.multipart_policy = multipart_policy; + + storage_enc + .put( + Path::new("hello_multipart_enc"), + Box::new(vec![2u8; 15 * 1024 * 1024]), + ) + .await + .unwrap(); + + // Nominal SSE-C path (write and read with the key) + let enc_obj_enc_read = storage_enc + .get_all(Path::new("hello_multipart_enc")) + .await + .unwrap(); + assert_eq!(enc_obj_enc_read, vec![2u8; 15 * 1024 * 1024].as_slice()); + let num_bytes = storage_enc + .file_num_bytes(Path::new("hello_multipart_enc")) + .await + .unwrap(); + assert_eq!(num_bytes, 15 * 1024 * 1024); + } + + #[tokio::test] + #[ignore] + async fn test_aws_s3_encryption() { + let access_key_id = quickwit_common::get_from_env_opt("TEST_AWS_S3_ACCESS_KEY_ID", false); + let secret_access_key = + quickwit_common::get_from_env_opt("TEST_AWS_S3_SECRET_ACCESS_KEY", false); + let bucket = "s3://amzn-enc-test-sk"; + + let config_enc = S3StorageConfig { + access_key_id: access_key_id.clone(), + secret_access_key: secret_access_key.clone(), + region: Some("eu-west-1".to_string()), + encryption: Some(S3EncryptionConfig::SseC { + key: "Ia6VCtDyKCRi3N88na+m7pgiMNeicLVq70Swq1fdDOU=".to_string(), + key_md5: "viLkS9avbUl3bNFYDdAJRQ==".to_string(), + read_only: false, + }), + ..Default::default() + }; + let storage_enc = S3CompatibleObjectStorage::from_uri(&config_enc, &Uri::for_test(bucket)) + .await + .unwrap(); + + let config_plain = S3StorageConfig { + access_key_id: access_key_id.clone(), + secret_access_key: secret_access_key.clone(), + region: Some("eu-west-1".to_string()), + encryption: None, + ..Default::default() + }; + let storage_plain = + S3CompatibleObjectStorage::from_uri(&config_plain, &Uri::for_test(bucket)) + .await + .unwrap(); + + storage_enc + .put( + Path::new("hello_enc"), + Box::new("Test content".as_bytes().to_vec()), + ) + .await + .unwrap(); + + storage_plain + .put( + Path::new("hello_plain"), + Box::new("Test content".as_bytes().to_vec()), + ) + .await + .unwrap(); + + // Nominal SSE-C path (write and read with the key) + let enc_obj_enc_read = storage_enc.get_all(Path::new("hello_enc")).await.unwrap(); + assert_eq!(enc_obj_enc_read, "Test content".as_bytes()); + let num_bytes = storage_enc + .file_num_bytes(Path::new("hello_enc")) + .await + .unwrap(); + assert_eq!(num_bytes, "Test content".len() as u64); + + // On AWS S3, you cannot read plain objects with the encryption headers + storage_enc + .get_all(Path::new("hello_plain")) + .await + .unwrap_err(); + + // SSE-S3 enabled on all AWS buckets by default + let plain_obj_plain_read = storage_plain + .get_all(Path::new("hello_plain")) + .await + .unwrap(); + assert_eq!(plain_obj_plain_read, "Test content".as_bytes()); + assert_auto_encrypted(&storage_plain, Path::new("hello_plain")).await; + + // Encrypted objects should not be readable without the key + storage_plain + .get_all(Path::new("hello_enc")) + .await + .unwrap_err(); + } + + /// Requires an OVH bucket **with encryption enabled**. + #[tokio::test] + #[ignore] + async fn test_ovh_omk_s3_encryption() { + let access_key_id = quickwit_common::get_from_env_opt("TEST_OVH_S3_ACCESS_KEY_ID", false); + let secret_access_key = + quickwit_common::get_from_env_opt("TEST_OVH_S3_SECRET_ACCESS_KEY", false); + let endpoint = quickwit_common::get_from_env_opt("TEST_OVH_S3_ENDPOINT_URL", false); + // OMK enabled on this bucket + let bucket = "s3://ambitious-walton"; + + let config_enc = S3StorageConfig { + access_key_id: access_key_id.clone(), + secret_access_key: secret_access_key.clone(), + region: Some("eu-west-par".to_string()), + endpoint: endpoint.clone(), + encryption: Some(S3EncryptionConfig::SseC { + key: "Ia6VCtDyKCRi3N88na+m7pgiMNeicLVq70Swq1fdDOU=".to_string(), + key_md5: "viLkS9avbUl3bNFYDdAJRQ==".to_string(), + read_only: false, + }), + ..Default::default() + }; + let storage_enc = S3CompatibleObjectStorage::from_uri(&config_enc, &Uri::for_test(bucket)) + .await + .unwrap(); + + let config_plain = S3StorageConfig { + access_key_id: access_key_id.clone(), + secret_access_key: secret_access_key.clone(), + region: Some("eu-west-par".to_string()), + endpoint: endpoint.clone(), + encryption: None, + ..Default::default() + }; + let storage_plain = + S3CompatibleObjectStorage::from_uri(&config_plain, &Uri::for_test(bucket)) + .await + .unwrap(); + + storage_enc + .put( + Path::new("hello_enc"), + Box::new("Test content".as_bytes().to_vec()), + ) + .await + .unwrap(); + + storage_plain + .put( + Path::new("hello_plain"), + Box::new("Test content".as_bytes().to_vec()), + ) + .await + .unwrap(); + + // Nominal SSE-C path (write and read with the key) + let enc_obj_enc_read = storage_enc.get_all(Path::new("hello_enc")).await.unwrap(); + assert_eq!(enc_obj_enc_read, "Test content".as_bytes()); + let num_bytes = storage_enc + .file_num_bytes(Path::new("hello_enc")) + .await + .unwrap(); + assert_eq!(num_bytes, "Test content".len() as u64); + + // When the object was encrypted using OMK, you cannot read plain + // objects with the encryption headers + storage_enc + .get_all(Path::new("hello_plain")) + .await + .unwrap_err(); + + // Nominal plain-text path. The auto-encryption assertion will pass only + // on OVH buckets with SSE-OMK + let plain_obj_plain_read = storage_plain + .get_all(Path::new("hello_plain")) + .await + .unwrap(); + assert_eq!(plain_obj_plain_read, "Test content".as_bytes()); + assert_auto_encrypted(&storage_plain, Path::new("hello_plain")).await; + + // Encrypted objects should not be readable without the key + storage_plain + .get_all(Path::new("hello_enc")) + .await + .unwrap_err(); + } + + /// Requires an OVH bucket **with encryption disabled**. + #[tokio::test] + #[ignore] + async fn test_ovh_plain_s3_encryption() { + let access_key_id = quickwit_common::get_from_env_opt("TEST_OVH_S3_ACCESS_KEY_ID", false); + let secret_access_key = + quickwit_common::get_from_env_opt("TEST_OVH_S3_SECRET_ACCESS_KEY", false); + let endpoint = quickwit_common::get_from_env_opt("TEST_OVH_S3_ENDPOINT_URL", false); + // OMK disabled on this bucket + let bucket = "s3://dramatic-akasaki-plaintext"; + + let config_enc = S3StorageConfig { + access_key_id: access_key_id.clone(), + secret_access_key: secret_access_key.clone(), + region: Some("eu-west-par".to_string()), + endpoint: endpoint.clone(), + encryption: Some(S3EncryptionConfig::SseC { + key: "Ia6VCtDyKCRi3N88na+m7pgiMNeicLVq70Swq1fdDOU=".to_string(), + key_md5: "viLkS9avbUl3bNFYDdAJRQ==".to_string(), + read_only: false, + }), + ..Default::default() + }; + let storage_enc = S3CompatibleObjectStorage::from_uri(&config_enc, &Uri::for_test(bucket)) + .await + .unwrap(); + + let config_plain = S3StorageConfig { + access_key_id: access_key_id.clone(), + secret_access_key: secret_access_key.clone(), + region: Some("eu-west-par".to_string()), + endpoint: endpoint.clone(), + encryption: None, + ..Default::default() + }; + let storage_plain = + S3CompatibleObjectStorage::from_uri(&config_plain, &Uri::for_test(bucket)) + .await + .unwrap(); + + storage_enc + .put( + Path::new("hello_enc"), + Box::new("Test content".as_bytes().to_vec()), + ) + .await + .unwrap(); + + storage_plain + .put( + Path::new("hello_plain"), + Box::new("Test content".as_bytes().to_vec()), + ) + .await + .unwrap(); + + // Nominal SSE-C path (write and read with the key) + let enc_obj_enc_read = storage_enc.get_all(Path::new("hello_enc")).await.unwrap(); + assert_eq!(enc_obj_enc_read, "Test content".as_bytes()); + + // When encryption is disabled on the bucket, you can read plain-text objects with the + // encryption headers + let plain_obj_enc_read = storage_enc.get_all(Path::new("hello_plain")).await.unwrap(); + assert_eq!(plain_obj_enc_read, "Test content".as_bytes()); + let num_bytes = storage_enc + .file_num_bytes(Path::new("hello_enc")) + .await + .unwrap(); + assert_eq!(num_bytes, "Test content".len() as u64); + + // Nominal plain-text path (write and read without the key) + let plain_obj_plain_read = storage_plain + .get_all(Path::new("hello_plain")) + .await + .unwrap(); + assert_eq!(plain_obj_plain_read, "Test content".as_bytes()); + let num_bytes = storage_enc + .file_num_bytes(Path::new("hello_enc")) + .await + .unwrap(); + assert_eq!(num_bytes, "Test content".len() as u64); + + // Encrypted objects should not be readable without the key + storage_plain + .get_all(Path::new("hello_enc")) + .await + .unwrap_err(); + } } From bf44bef880f42bc70b2c3b8bb6fc2eebbacf4ace Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Cohen?= Date: Wed, 21 Jan 2026 21:25:57 +0100 Subject: [PATCH 3/3] feat: String as sort in elasticsearch endpoints (#6108) Signed-off-by: Darkheir --- .../elasticsearch_api/model/search_body.rs | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/model/search_body.rs b/quickwit/quickwit-serve/src/elasticsearch_api/model/search_body.rs index d2850349aa2..501620ebd11 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/model/search_body.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/model/search_body.rs @@ -134,7 +134,17 @@ impl<'de> Visitor<'de> for FieldSortVecVisitor { type Value = Vec; fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("An array containing the sort fields.") + formatter.write_str("A string, array, or object containing the sort fields.") + } + + fn visit_str(self, field_name: &str) -> Result, E> + where E: serde::de::Error { + let order = default_elasticsearch_sort_order(field_name); + Ok(vec![SortField { + field: field_name.to_string(), + order, + date_format: None, + }]) } fn visit_seq(self, mut seq: A) -> Result, A::Error> @@ -231,6 +241,20 @@ mod tests { assert_eq!(field_sorts[1].order, SortOrder::Asc); } + #[test] + fn test_sort_field_str() { + let json = r#" + { + "sort": "timestamp" + } + "#; + let search_body: SearchBody = serde_json::from_str(json).unwrap(); + let field_sorts = search_body.sort.unwrap(); + assert_eq!(field_sorts.len(), 1); + assert_eq!(field_sorts[0].field, "timestamp"); + assert_eq!(field_sorts[0].order, SortOrder::Asc); + } + #[test] fn test_sort_default_orders() { let json = r#"