diff --git a/rust/lance-index/src/scalar/ngram.rs b/rust/lance-index/src/scalar/ngram.rs index b452ef78c85..01f634a5523 100644 --- a/rust/lance-index/src/scalar/ngram.rs +++ b/rust/lance-index/src/scalar/ngram.rs @@ -28,7 +28,7 @@ use crate::vector::VectorIndex; use crate::{Index, IndexType}; use arrow::array::{AsArray, UInt32Builder}; use arrow::datatypes::{UInt32Type, UInt64Type}; -use arrow_array::{BinaryArray, RecordBatch, UInt32Array}; +use arrow_array::{BinaryArray, RecordBatch, UInt32Array, UInt64Array}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use async_trait::async_trait; use datafusion::execution::SendableRecordBatchStream; @@ -56,6 +56,7 @@ mod ngram_regex; pub(crate) use ngram_regex::regex_can_use_index; const TOKENS_COL: &str = "tokens"; +const CARDINALITY_COL: &str = "cardinality"; const POSTING_LIST_COL: &str = "posting_list"; const POSTINGS_FILENAME: &str = "ngram_postings.lance"; const NGRAM_INDEX_VERSION: u32 = 0; @@ -64,11 +65,14 @@ use std::sync::LazyLock; pub static TOKENS_FIELD: LazyLock = LazyLock::new(|| Field::new(TOKENS_COL, DataType::UInt32, true)); +pub static CARDINALITY_FIELD: LazyLock = + LazyLock::new(|| Field::new(CARDINALITY_COL, DataType::UInt64, false)); pub static POSTINGS_FIELD: LazyLock = LazyLock::new(|| Field::new(POSTING_LIST_COL, DataType::Binary, false)); pub static POSTINGS_SCHEMA: LazyLock = LazyLock::new(|| { Arc::new(Schema::new(vec![ TOKENS_FIELD.clone(), + CARDINALITY_FIELD.clone(), POSTINGS_FIELD.clone(), ])) }); @@ -269,6 +273,9 @@ impl NGramPostingListReader { pub struct NGramIndex { /// The mapping from tokens to row offsets tokens: HashMap, + /// The number of rows in each token's posting list. Older NGram indices do + /// not have this metadata and fall back to loading every required token. + token_cardinalities: Option>, /// The reader for the posting lists list_reader: Arc, /// The tokenizer used to tokenize text. Note: not all tokenizers can be used with this index. For @@ -293,6 +300,7 @@ impl std::fmt::Debug for NGramIndex { impl DeepSizeOf for NGramIndex { fn deep_size_of_children(&self, context: &mut lance_core::deepsize::Context) -> usize { self.tokens.deep_size_of_children(context) + + self.token_cardinalities.deep_size_of_children(context) } } @@ -302,9 +310,16 @@ impl NGramIndex { frag_reuse_index: Option>, index_cache: &LanceCache, ) -> Result { - let tokens = store.open_index_file(POSTINGS_FILENAME).await?; - let tokens = tokens - .read_range(0..tokens.num_rows(), Some(&[TOKENS_COL])) + let tokens_reader = store.open_index_file(POSTINGS_FILENAME).await?; + let token_count = tokens_reader.num_rows(); + let has_cardinality = tokens_reader.schema().field(CARDINALITY_COL).is_some(); + let projection: &[&str] = if has_cardinality { + &[TOKENS_COL, CARDINALITY_COL] + } else { + &[TOKENS_COL] + }; + let tokens = tokens_reader + .read_range(0..token_count, Some(projection)) .await?; let tokens_map = HashMap::from_iter( @@ -317,6 +332,30 @@ impl NGramIndex { .enumerate() .map(|(idx, token)| (token, idx as u32)), ); + let token_cardinalities = if has_cardinality { + let cardinalities = tokens + .column_by_name(CARDINALITY_COL) + .expect("cardinality column should be present"); + Some( + tokens + .column_by_name(TOKENS_COL) + .expect("tokens column should be present") + .as_primitive::() + .values() + .iter() + .copied() + .zip( + cardinalities + .as_primitive::() + .values() + .iter() + .copied(), + ) + .collect(), + ) + } else { + None + }; let posting_reader = Arc::new(NGramPostingListReader { reader: store.open_index_file(POSTINGS_FILENAME).await?, @@ -327,6 +366,7 @@ impl NGramIndex { Ok(Self { io_parallelism: store.io_parallelism(), tokens: tokens_map, + token_cardinalities, list_reader: posting_reader, tokenizer: NGRAM_TOKENIZER.clone(), store, @@ -343,7 +383,10 @@ impl NGramIndex { .expect_ok()? .as_binary::(); - let new_posting_lists = posting_lists_array + // The input batch may come from either the old two-column format or the + // current three-column format. Remap only needs tokens and postings; + // the destination is always rewritten in the current format. + let remapped_posting_lists = posting_lists_array .iter() .map(|posting_list| { let posting_list = posting_list.unwrap(); @@ -356,18 +399,23 @@ impl NGramIndex { None => Some(row_id), } })); + let cardinality = new_posting_list.len(); let mut buf = Vec::with_capacity(new_posting_list.serialized_size()); new_posting_list.serialize_into(&mut buf)?; - Ok(buf) + Ok((buf, cardinality)) }) .collect::>>()?; + let (new_posting_lists, cardinalities): (Vec<_>, Vec<_>) = + remapped_posting_lists.into_iter().unzip(); let new_posting_lists_array = BinaryArray::from_iter_values(new_posting_lists); + let cardinalities = UInt64Array::from_iter_values(cardinalities); Ok(RecordBatch::try_new( POSTINGS_SCHEMA.clone(), vec![ batch.column_by_name(TOKENS_COL).expect_ok()?.clone(), + Arc::new(cardinalities), Arc::new(new_posting_lists_array), ], )?) @@ -385,6 +433,62 @@ impl NGramIndex { Self::from_store(store, frag_reuse_index, index_cache).await?, )) } + + fn regex_query_is_conjunction(query: &ngram_regex::TrigramQuery) -> bool { + match query { + ngram_regex::TrigramQuery::Trigram(_) => true, + ngram_regex::TrigramQuery::And(items) => { + items.iter().all(Self::regex_query_is_conjunction) + } + ngram_regex::TrigramQuery::All | ngram_regex::TrigramQuery::None => true, + ngram_regex::TrigramQuery::Or(_) => false, + } + } + + fn sorted_tokens_for_conjunction(&self, tokens: HashSet) -> Vec { + let mut tokens = tokens.into_iter().collect::>(); + let Some(cardinalities) = &self.token_cardinalities else { + return tokens; + }; + // This metadata sort is cheap and can discover missing required tokens + // before any posting list I/O. All posting lists are still loaded in + // this first version; a future top-k planner can stop after enough rare + // tokens have been selected. + tokens.sort_by_key(|token| (cardinalities.get(token).copied().unwrap_or(0), *token)); + tokens + } + + async fn search_regex_conjunction( + &self, + tokens: HashSet, + metrics: &dyn MetricsCollector, + ) -> Result { + let tokens = self.sorted_tokens_for_conjunction(tokens); + let mut row_offsets = Vec::with_capacity(tokens.len()); + for token in tokens { + let Some(row_offset) = self.tokens.get(&token) else { + return Ok(SearchResult::at_most(RowAddrTreeMap::new())); + }; + row_offsets.push(*row_offset); + } + let posting_lists = futures::stream::iter( + row_offsets + .into_iter() + .map(|row_offset| self.list_reader.ngram_list(row_offset, metrics)), + ) + .buffer_unordered(self.io_parallelism) + .try_collect::>() + .await?; + metrics.record_comparisons(posting_lists.len()); + let mut posting_lists = posting_lists; + // `buffer_unordered` starts reads from the rare-first offset list, but + // collects them in completion order. Re-sort the loaded bitmaps so the + // CPU intersection still clones and intersects the smallest list first. + posting_lists.sort_by_key(|list| list.bitmap.len()); + let list_refs = posting_lists.iter().map(|list| list.as_ref()); + let row_ids = NGramPostingList::intersect(list_refs); + Ok(SearchResult::at_most(RowAddrTreeMap::from(row_ids))) + } } #[async_trait] @@ -497,6 +601,9 @@ impl ScalarIndex for NGramIndex { _ => { let mut tokens = HashSet::new(); ngram_regex::collect_tokens(&trigram_query, &mut tokens); + if Self::regex_query_is_conjunction(&trigram_query) { + return self.search_regex_conjunction(tokens, metrics).await; + } // Fetch the posting list for every trigram the condition // references; a token absent from the index contributes // an empty list, which `eval_trigram_query` handles. @@ -661,6 +768,8 @@ impl NGramIndexSpillState { } fn try_into_batch(self) -> Result { + let cardinalities = + UInt64Array::from_iter_values(self.bitmaps.iter().map(RoaringTreemap::len)); let bitmap_array = BinaryArray::from_iter_values(self.bitmaps.into_iter().map(|bitmap| { let mut buf = Vec::with_capacity(bitmap.serialized_size()); bitmap.serialize_into(&mut buf).unwrap(); @@ -668,7 +777,11 @@ impl NGramIndexSpillState { })); Ok(RecordBatch::try_new( POSTINGS_SCHEMA.clone(), - vec![Arc::new(self.tokens), Arc::new(bitmap_array)], + vec![ + Arc::new(self.tokens), + Arc::new(cardinalities), + Arc::new(bitmap_array), + ], )?) } } @@ -1373,7 +1486,8 @@ mod tests { }; use arrow::datatypes::UInt64Type; - use arrow_array::{Array, RecordBatch, StringArray, UInt64Array}; + use arrow_array::cast::AsArray; + use arrow_array::{Array, BinaryArray, RecordBatch, StringArray, UInt32Array, UInt64Array}; use arrow_schema::{DataType, Field, Schema}; use datafusion::{ execution::SendableRecordBatchStream, physical_plan::stream::RecordBatchStreamAdapter, @@ -1388,13 +1502,20 @@ mod tests { use lance_tokenizer::TextAnalyzer; use crate::scalar::{ - ScalarIndex, SearchResult, TextQuery, + IndexStore, ScalarIndex, SearchResult, TextQuery, lance_format::LanceIndexStore, ngram::{NGramIndex, NGramIndexBuilder, NGramIndexBuilderOptions}, }; - use crate::{metrics::NoOpMetricsCollector, scalar::registry::VALUE_COLUMN_NAME}; + use crate::{ + metrics::{LocalMetricsCollector, NoOpMetricsCollector}, + scalar::registry::VALUE_COLUMN_NAME, + }; - use super::{NGRAM_TOKENIZER, ngram_to_token, tokenize_visitor}; + use super::{ + CARDINALITY_COL, NGRAM_N, NGRAM_TOKENIZER, POSTING_LIST_COL, POSTINGS_FILENAME, TOKENS_COL, + ngram_to_token, tokenize_visitor, + }; + use roaring::RoaringTreemap; fn collect_tokens(analyzer: &TextAnalyzer, text: &str) -> Vec { let mut tokens = Vec::with_capacity(text.len() * 3); @@ -1487,6 +1608,28 @@ mod tests { list.bitmap.iter().sorted().collect() } + async fn read_cardinality_for_trigram(index: &NGramIndex, trigram: &str) -> u64 { + let token = ngram_to_token(trigram, NGRAM_N); + let row_offset = index.tokens[&token]; + let reader = index + .store + .open_index_file(POSTINGS_FILENAME) + .await + .unwrap(); + let batch = reader + .read_range( + row_offset as usize..row_offset as usize + 1, + Some(&[CARDINALITY_COL]), + ) + .await + .unwrap(); + batch + .column_by_name(CARDINALITY_COL) + .unwrap() + .as_primitive::() + .value(0) + } + #[test_log::test(tokio::test)] async fn test_basic_ngram_index() { let data = StringArray::from_iter_values([ @@ -1687,6 +1830,119 @@ mod tests { ); } + #[test_log::test(tokio::test)] + async fn test_ngram_writes_cardinality_metadata() { + let data = StringArray::from_iter_values(["cat", "cat dog", "dog"]); + let row_ids = UInt64Array::from_iter_values((0..data.len()).map(|i| i as u64)); + let schema = Arc::new(Schema::new(vec![ + Field::new(VALUE_COLUMN_NAME, DataType::Utf8, false), + Field::new(ROW_ID, DataType::UInt64, false), + ])); + let data = + RecordBatch::try_new(schema.clone(), vec![Arc::new(data), Arc::new(row_ids)]).unwrap(); + let data = Box::pin(RecordBatchStreamAdapter::new( + schema, + stream::once(std::future::ready(Ok(data))), + )); + + let builder = NGramIndexBuilder::try_new(NGramIndexBuilderOptions::default()).unwrap(); + let (index, _tmpdir) = do_train(builder, data).await; + + assert!(index.token_cardinalities.is_some()); + assert_eq!(read_cardinality_for_trigram(&index, "cat").await, 2); + assert_eq!(read_cardinality_for_trigram(&index, "dog").await, 2); + } + + #[test_log::test(tokio::test)] + async fn test_ngram_regex_sorts_large_and_by_cardinality() { + let rows = (0..64).map(|i| { + if i == 7 { + "commoncommon raremarker".to_string() + } else { + "commoncommon".to_string() + } + }); + let data = StringArray::from_iter_values(rows); + let row_ids = UInt64Array::from_iter_values((0..data.len()).map(|i| i as u64)); + let schema = Arc::new(Schema::new(vec![ + Field::new(VALUE_COLUMN_NAME, DataType::Utf8, false), + Field::new(ROW_ID, DataType::UInt64, false), + ])); + let data = + RecordBatch::try_new(schema.clone(), vec![Arc::new(data), Arc::new(row_ids)]).unwrap(); + let data = Box::pin(RecordBatchStreamAdapter::new( + schema, + stream::once(std::future::ready(Ok(data))), + )); + + let builder = NGramIndexBuilder::try_new(NGramIndexBuilderOptions::default()).unwrap(); + let (index, _tmpdir) = do_train(builder, data).await; + let metrics = LocalMetricsCollector::default(); + let result = index + .search( + &TextQuery::Regex("commoncommon.*raremarker".to_string()), + &metrics, + ) + .await + .unwrap(); + + assert_eq!( + result, + SearchResult::at_most(RowAddrTreeMap::from_iter([7])) + ); + assert!( + metrics + .comparisons + .load(std::sync::atomic::Ordering::Relaxed) + > 8, + "large conjunction should keep all required trigrams to avoid broadening candidates" + ); + } + + #[test_log::test(tokio::test)] + async fn test_ngram_loads_old_two_column_postings() { + let tmpdir = Arc::new(TempDir::default()); + let test_store = LanceIndexStore::new( + Arc::new(ObjectStore::local()), + tmpdir.obj_path(), + Arc::new(LanceCache::no_cache()), + ); + let old_schema = Arc::new(Schema::new(vec![ + Field::new(TOKENS_COL, DataType::UInt32, true), + Field::new(POSTING_LIST_COL, DataType::Binary, false), + ])); + let token = ngram_to_token("cat", NGRAM_N); + let bitmap = RoaringTreemap::from_iter([42]); + let mut bitmap_bytes = Vec::new(); + bitmap.serialize_into(&mut bitmap_bytes).unwrap(); + let batch = RecordBatch::try_new( + old_schema.clone(), + vec![ + Arc::new(UInt32Array::from_iter_values([token])), + Arc::new(BinaryArray::from_iter_values([bitmap_bytes])), + ], + ) + .unwrap(); + let mut writer = test_store + .new_index_file(POSTINGS_FILENAME, old_schema) + .await + .unwrap(); + writer.write_record_batch(batch).await.unwrap(); + writer.finish().await.unwrap(); + + let index = NGramIndex::from_store(Arc::new(test_store), None, &LanceCache::no_cache()) + .await + .unwrap(); + assert_eq!(index.token_cardinalities, None); + assert_eq!( + index + .search(&TextQuery::Regex("cat".to_string()), &NoOpMetricsCollector) + .await + .unwrap(), + SearchResult::at_most(RowAddrTreeMap::from_iter([42])) + ); + } + fn test_data_schema() -> Arc { Arc::new(Schema::new(vec![ Field::new(VALUE_COLUMN_NAME, DataType::Utf8, true), diff --git a/rust/lance/benches/regex_ngram.rs b/rust/lance/benches/regex_ngram.rs index 76f597ad9cb..126b65d4394 100644 --- a/rust/lance/benches/regex_ngram.rs +++ b/rust/lance/benches/regex_ngram.rs @@ -6,15 +6,17 @@ //! Each query is a `regexp_match(doc, '...')` filter against a dataset that has //! an NGram index on `doc`. The query set spans a selective AND pattern, an //! alternation, a plain literal (rewritten to an infix LIKE before it reaches -//! the index), and a deliberately non-accelerable pattern (`a.b`, which yields -//! no trigram) that serves as a regression guard. +//! the index), skewed conjunctions with common / rare / missing trigrams, and a +//! deliberately non-accelerable pattern (`a.b`, which yields no trigram) that +//! serves as a regression guard. //! -//! On `main` none of these use the index (regex falls through to a full scan + -//! recheck); with the ngram-regex acceleration the index prunes candidates for -//! the first three while `a.b` stays a full scan. Capture a baseline on `main` -//! with `--save-baseline before_7130`, then compare after the change with -//! `--baseline before_7130`. +//! This benchmark covers regex-to-NGram acceleration for indexable patterns and +//! includes skewed conjunction cases where posting-list cardinality affects +//! query planning. The `a.b` case stays a full scan because it yields no +//! trigram. Set `LANCE_REGEX_NGRAM_TOTAL` to scale the dataset beyond the +//! default 200k rows. +use std::env; use std::hint::black_box; use std::sync::Arc; use std::time::Duration; @@ -35,34 +37,56 @@ use lance_testing::pprof::{Output, PProfProfiler}; const TOTAL: usize = 200_000; -/// Build the `doc` column: random sentences with rare markers injected into a -/// small fraction of rows so the regex queries have controlled selectivity. -/// The markers (`zqxwvu`, `needlexyz`, `qwerasdf`) are unlikely to appear in -/// the generated English-word sentences. -fn build_docs() -> StringArray { +fn total_rows() -> usize { + env::var("LANCE_REGEX_NGRAM_TOTAL") + .ok() + .and_then(|value| value.parse().ok()) + .unwrap_or(TOTAL) +} + +/// Build the `doc` column: random sentences with markers injected into rows so +/// the regex queries have controlled selectivity. The markers are unlikely to +/// appear in the generated English-word sentences. +fn build_docs(total: usize) -> StringArray { let mut sentence_gen = array::random_sentence(1, 30, false); let base = sentence_gen - .generate_default(RowCount::from(TOTAL as u64)) + .generate_default(RowCount::from(total as u64)) .unwrap(); let base = base.as_string::(); - let docs = (0..TOTAL).map(|i| { + let docs = (0..total).map(|i| { let sentence = base.value(i); + let mut doc = sentence.to_string(); if i % 200 == 0 { // ~0.5% of rows match `zqxwvu.*needlexyz` and `zqxwvu`. - format!("{sentence} zqxwvu needlexyz") + doc.push_str(" zqxwvu needlexyz"); } else if i % 211 == 0 { // A second marker for the alternation query. - format!("{sentence} qwerasdf") - } else { - sentence.to_string() + doc.push_str(" qwerasdf"); + } + if i % 2 == 0 { + // A deliberately common marker used to benchmark rare-token selection + // in large regex conjunctions. + doc.push_str(" commoncommon"); + } + doc.push_str(" commonmarkerabcdefghijklmnopqrstuvwx"); + if i % 997 == 0 { + doc.push_str(" raremarker"); } + if i % 3 == 0 { + doc.push_str(" densecommonprefix densecommonsuffix"); + } + if i % 1501 == 0 { + doc.push_str(" longrareanchor"); + } + doc }); StringArray::from_iter_values(docs) } async fn build_dataset(tempdir: &TempStrDir) -> Arc { let schema = Arc::new(Schema::new(vec![Field::new("doc", DataType::Utf8, false)])); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(build_docs())]).unwrap(); + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(build_docs(total_rows()))]).unwrap(); let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); let mut dataset = Dataset::write(reader, tempdir.as_str(), None) @@ -101,6 +125,26 @@ fn bench_regex_ngram(c: &mut Criterion) { "regexp_match(doc, '(zqxwvu|qwerasdf|needlexyz)')", ), ("plain_literal", "regexp_match(doc, 'zqxwvu')"), + ( + // Common required trigram plus a rare required trigram. + "skewed_and", + "regexp_match(doc, 'commoncommon.*raremarker')", + ), + ( + // Several required trigrams, including one rare anchor. + "many_required_trigrams", + "regexp_match(doc, 'densecommonprefix.*longrareanchor.*densecommonsuffix')", + ), + ( + // One common required trigram and one missing required trigram. + "missing_required_trigram", + "regexp_match(doc, 'commoncommon.*missingmarker')", + ), + ( + // Many common required trigrams followed by a missing required trigram. + "many_common_missing_trigram", + "regexp_match(doc, 'commonmarkerabcdefghijklmnopqrstuvwx.*missingmarker')", + ), ("non_accelerable_a_dot_b", "regexp_match(doc, 'a.b')"), ];