Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ jobs:

- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
- name: Install Ubuntu packages
run: sudo apt-get -y install protobuf-compiler
run: |
sudo apt-get update
sudo apt-get -y install protobuf-compiler
- uses: actions/setup-python@83679a892e2d95755f2dac6acb0bfd1e9ac5d548 # v.6.1.0
with:
python-version: '3.11'
Expand Down Expand Up @@ -132,7 +134,9 @@ jobs:
- .github/workflows/ci.yml
- name: Install Ubuntu packages
if: always() && steps.modified.outputs.rust_src == 'true'
run: sudo apt-get -y install protobuf-compiler
run: |
sudo apt-get update
sudo apt-get -y install protobuf-compiler
- name: Setup nightly Rust Toolchain (for rustfmt)
if: steps.modified.outputs.rust_src == 'true'
uses: dtolnay/rust-toolchain@f7ccc83f9ed1e5b9c81d8a67d7ad1a747e22a561 # master
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/ui-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ jobs:
task:
- name: Cypress run
command: |
sudo apt-get update
sudo apt-get -y install protobuf-compiler
CI=false yarn --cwd quickwit-ui build
RUSTFLAGS="--cfg tokio_unstable" cargo build --features=postgres
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ pub use crate::node_config::{
use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig};
pub use crate::storage_config::{
AzureStorageConfig, FileStorageConfig, GoogleCloudStorageConfig, RamStorageConfig,
S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, StorageConfigs,
S3EncryptionConfig, S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig,
StorageConfigs,
};

/// Returns true if the ingest API v2 is enabled.
Expand Down
59 changes: 59 additions & 0 deletions quickwit/quickwit-config/src/storage_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,36 @@ impl fmt::Debug for AzureStorageConfig {
}
}

#[derive(Clone, Eq, PartialEq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum S3EncryptionConfig {
/// This is the standard AES256 SSE-C header config. Key is expected to be a
/// 256bit base64-encoded string, and key_md5 is expected to be the
/// base64-encoded MD5 digest of the (binary) key. Akamai gen1 buckets don't
/// respect this (only the a 32 hex char key is expected).
SseC {
key: String,
key_md5: String,
read_only: bool,
},
}

impl fmt::Debug for S3EncryptionConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
S3EncryptionConfig::SseC {
key_md5, read_only, ..
} => f
.debug_struct("S3EncryptionConfig")
.field("type", &"sse_c")
.field("key", &"***redacted***")
.field("key_md5", key_md5)
.field("read_only", read_only)
.finish(),
}
}
}

#[derive(Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct S3StorageConfig {
Expand All @@ -329,6 +359,8 @@ pub struct S3StorageConfig {
pub disable_multi_object_delete: bool,
#[serde(default)]
pub disable_multipart_upload: bool,
#[serde(default)]
pub encryption: Option<S3EncryptionConfig>,
}

impl S3StorageConfig {
Expand Down Expand Up @@ -685,4 +717,31 @@ mod tests {
assert_eq!(s3_storage_config.flavor, Some(StorageBackendFlavor::MinIO));
}
}

#[test]
fn test_storage_s3_config_encryption_serde() {
{
let s3_storage_config_yaml = r#"
endpoint: http://localhost:4566
encryption:
type: sse_c
key: test-customer-key
key_md5: test-customer-key-md5
read_only: true
"#;
let s3_storage_config: S3StorageConfig =
serde_yaml::from_str(s3_storage_config_yaml).unwrap();

let expected_s3_config = S3StorageConfig {
endpoint: Some("http://localhost:4566".to_string()),
encryption: Some(S3EncryptionConfig::SseC {
key: "test-customer-key".to_string(),
key_md5: "test-customer-key-md5".to_string(),
read_only: true,
}),
..Default::default()
};
assert_eq!(s3_storage_config, expected_s3_config);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub struct TermQueryParams {
case_insensitive: bool,
}

#[cfg(test)]
pub fn term_query_from_field_value(field: impl ToString, value: impl ToString) -> TermQuery {
TermQuery {
field: field.to_string(),
Expand Down
23 changes: 11 additions & 12 deletions quickwit/quickwit-query/src/elastic_query_dsl/terms_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeSet, HashMap};

use serde::Deserialize;

use crate::elastic_query_dsl::bool_query::BoolQuery;
use crate::elastic_query_dsl::one_field_map::OneFieldMap;
use crate::elastic_query_dsl::term_query::term_query_from_field_value;
use crate::elastic_query_dsl::{ConvertibleToQueryAst, ElasticQueryDslInner};
use crate::not_nan_f32::NotNaNf32;
use crate::query_ast::QueryAst;
use crate::query_ast::{QueryAst, TermSetQuery};

#[derive(PartialEq, Eq, Debug, Deserialize, Clone)]
#[serde(try_from = "TermsQueryForSerialization")]
Expand Down Expand Up @@ -87,15 +87,14 @@ impl TryFrom<TermsQueryForSerialization> for TermsQuery {

impl ConvertibleToQueryAst for TermsQuery {
fn convert_to_query_ast(self) -> anyhow::Result<QueryAst> {
let term_queries: Vec<ElasticQueryDslInner> = self
.values
.into_iter()
.map(|value| term_query_from_field_value(self.field.clone(), value))
.map(ElasticQueryDslInner::from)
.collect();
let mut union = BoolQuery::union(term_queries);
union.boost = self.boost;
union.convert_to_query_ast()
let mut terms_per_field = HashMap::new();
let values_set: BTreeSet<String> = self.values.into_iter().collect();
terms_per_field.insert(self.field, values_set);

let term_set_query = TermSetQuery { terms_per_field };
let query_ast: QueryAst = term_set_query.into();

Ok(query_ast.boost(self.boost))
}
}

Expand Down
121 changes: 117 additions & 4 deletions quickwit/quickwit-query/src/query_ast/term_set_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tantivy::Term;

use crate::InvalidQuery;
use crate::query_ast::{
BuildTantivyAst, BuildTantivyAstContext, QueryAst, TantivyQueryAst, TermQuery,
BoolQuery, BuildTantivyAst, BuildTantivyAstContext, QueryAst, TantivyQueryAst, TermQuery,
};

/// TermSetQuery matches the same document set as if it was a union of
Expand All @@ -32,6 +32,53 @@ pub struct TermSetQuery {
}

impl TermSetQuery {
fn has_fast_only_field(&self, context: &BuildTantivyAstContext) -> bool {
for full_path in self.terms_per_field.keys() {
if let Some((_, field_entry, _)) =
super::utils::find_field_or_hit_dynamic(full_path, context.schema)
&& field_entry.is_fast()
&& !field_entry.is_indexed()
{
return true;
}
}
false
}

fn build_bool_query(
&self,
context: &BuildTantivyAstContext,
) -> Result<TantivyQueryAst, InvalidQuery> {
let should_clauses = self
.terms_per_field
.iter()
.flat_map(|(full_path, values)| {
values.iter().map(|value| {
QueryAst::Term(TermQuery {
field: full_path.to_string(),
value: value.to_string(),
})
})
})
.collect();

let bool_query = BoolQuery {
should: should_clauses,
..Default::default()
};

bool_query.build_tantivy_ast_impl(context)
}

fn build_term_set_query(
&self,
context: &BuildTantivyAstContext,
) -> Result<TantivyQueryAst, InvalidQuery> {
let terms_it = self.make_term_iterator(context)?;
let term_set_query = tantivy::query::TermSetQuery::new(terms_it);
Ok(term_set_query.into())
}

fn make_term_iterator(
&self,
context: &BuildTantivyAstContext,
Expand Down Expand Up @@ -67,9 +114,11 @@ impl BuildTantivyAst for TermSetQuery {
&self,
context: &BuildTantivyAstContext,
) -> Result<TantivyQueryAst, InvalidQuery> {
let terms_it = self.make_term_iterator(context)?;
let term_set_query = tantivy::query::TermSetQuery::new(terms_it);
Ok(term_set_query.into())
if self.has_fast_only_field(context) {
self.build_bool_query(context)
} else {
self.build_term_set_query(context)
}
}
}

Expand All @@ -78,3 +127,67 @@ impl From<TermSetQuery> for QueryAst {
QueryAst::TermSet(term_set_query)
}
}

#[cfg(test)]
mod tests {
use std::collections::{BTreeSet, HashMap};

use tantivy::schema::{FAST, INDEXED, Schema};

use super::TermSetQuery;
use crate::query_ast::{BuildTantivyAst, BuildTantivyAstContext};

#[test]
fn test_term_set_query_with_fast_only_field_returns_bool_query() {
let mut schema_builder = Schema::builder();
schema_builder.add_u64_field("fast_field", FAST);
let schema = schema_builder.build();

let terms_per_field = HashMap::from([(
"fast_field".to_string(),
BTreeSet::from(["1".to_string(), "2".to_string()]),
)]);
let term_set_query = TermSetQuery { terms_per_field };

let tantivy_query_ast = term_set_query
.build_tantivy_ast_call(&BuildTantivyAstContext::for_test(&schema))
.unwrap();

let bool_query = tantivy_query_ast
.as_bool_query()
.expect("Expected BoolQuery for fast-only field, but got a different query type");
assert_eq!(bool_query.should.len(), 2);
assert_eq!(bool_query.must.len(), 0);
assert_eq!(bool_query.must_not.len(), 0);
assert_eq!(bool_query.filter.len(), 0);
}

#[test]
fn test_term_set_query_with_indexed_field_uses_term_set() {
let mut schema_builder = Schema::builder();
schema_builder.add_u64_field("indexed_field", FAST | INDEXED);
let schema = schema_builder.build();

let terms_per_field = HashMap::from([(
"indexed_field".to_string(),
BTreeSet::from(["1".to_string(), "2".to_string()]),
)]);
let term_set_query = TermSetQuery { terms_per_field };

let tantivy_query_ast = term_set_query
.build_tantivy_ast_call(&BuildTantivyAstContext::for_test(&schema))
.unwrap();

// Should return a leaf query (TermSetQuery wrapped in TantivyQueryAst)
let leaf = tantivy_query_ast
.as_leaf()
.expect("Expected a leaf query (TermSetQuery), but got a complex query");

// Verify it's a TermSetQuery by checking the debug representation
let debug_str = format!("{leaf:?}");
assert!(
debug_str.contains("TermSetQuery"),
"Expected TermSetQuery, got: {debug_str}"
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,17 @@ impl<'de> Visitor<'de> for FieldSortVecVisitor {
type Value = Vec<SortField>;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("An array containing the sort fields.")
formatter.write_str("A string, array, or object containing the sort fields.")
}

fn visit_str<E>(self, field_name: &str) -> Result<Vec<SortField>, E>
where E: serde::de::Error {
let order = default_elasticsearch_sort_order(field_name);
Ok(vec![SortField {
field: field_name.to_string(),
order,
date_format: None,
}])
}

fn visit_seq<A>(self, mut seq: A) -> Result<Vec<SortField>, A::Error>
Expand Down Expand Up @@ -231,6 +241,20 @@ mod tests {
assert_eq!(field_sorts[1].order, SortOrder::Asc);
}

#[test]
fn test_sort_field_str() {
let json = r#"
{
"sort": "timestamp"
}
"#;
let search_body: SearchBody = serde_json::from_str(json).unwrap();
let field_sorts = search_body.sort.unwrap();
assert_eq!(field_sorts.len(), 1);
assert_eq!(field_sorts[0].field, "timestamp");
assert_eq!(field_sorts[0].order, SortOrder::Asc);
}

#[test]
fn test_sort_default_orders() {
let json = r#"
Expand Down
Loading
Loading