From 0c75d81cfe6e347913634805373e3b080fed4bfa Mon Sep 17 00:00:00 2001 From: fanng <“fanng@apache.org”> Date: Fri, 19 Jun 2026 12:32:05 +0800 Subject: [PATCH 1/4] Remove misleading LanceDB naming --- daft_lance/_lance.py | 52 +++++++++---------- daft_lance/lance_scan.py | 24 ++++----- tests/io/{lancedb => lance}/__init__.py | 0 tests/io/{lancedb => lance}/conftest.py | 0 .../{lancedb => lance}/test_blob_v2_policy.py | 0 .../test_expression_point_lookup.py | 0 .../test_fast_path_merge.py | 0 .../{lancedb => lance}/test_lance_batching.py | 0 .../test_lance_blob_v2_e2e.py | 0 .../test_lance_blob_v2_read.py | 0 .../test_lance_blob_v2_write.py | 0 .../test_lance_compaction.py} | 0 .../test_lance_count_pushdown_coverage.py} | 14 ++--- .../test_lance_custom_task.py} | 0 .../test_lance_data_sink_internals.py | 0 .../test_lance_data_sink_storage_versions.py | 0 .../test_lance_factory_function.py} | 10 ++-- .../test_lance_merge_evolution.py | 0 .../test_lance_point_lookup.py} | 4 +- .../test_lance_reads.py} | 44 ++++++++-------- .../test_lance_scalar_index.py} | 6 +-- .../test_lance_vector_search.py} | 0 .../test_lance_writes.py} | 14 ++--- .../{lancedb => lance}/test_mem_wal_writes.py | 0 24 files changed, 84 insertions(+), 84 deletions(-) rename tests/io/{lancedb => lance}/__init__.py (100%) rename tests/io/{lancedb => lance}/conftest.py (100%) rename tests/io/{lancedb => lance}/test_blob_v2_policy.py (100%) rename tests/io/{lancedb => lance}/test_expression_point_lookup.py (100%) rename tests/io/{lancedb => lance}/test_fast_path_merge.py (100%) rename tests/io/{lancedb => lance}/test_lance_batching.py (100%) rename tests/io/{lancedb => lance}/test_lance_blob_v2_e2e.py (100%) rename tests/io/{lancedb => lance}/test_lance_blob_v2_read.py (100%) rename tests/io/{lancedb => lance}/test_lance_blob_v2_write.py (100%) rename tests/io/{lancedb/test_lancedb_compaction.py => lance/test_lance_compaction.py} (100%) rename tests/io/{lancedb/test_lancedb_count_pushdown_coverage.py => lance/test_lance_count_pushdown_coverage.py} (88%) rename tests/io/{lancedb/test_lancedb_custom_task.py => lance/test_lance_custom_task.py} (100%) rename tests/io/{lancedb => lance}/test_lance_data_sink_internals.py (100%) rename tests/io/{lancedb => lance}/test_lance_data_sink_storage_versions.py (100%) rename tests/io/{lancedb/test_lancedb_factory_function.py => lance/test_lance_factory_function.py} (95%) rename tests/io/{lancedb => lance}/test_lance_merge_evolution.py (100%) rename tests/io/{lancedb/test_lancedb_point_lookup.py => lance/test_lance_point_lookup.py} (97%) rename tests/io/{lancedb/test_lancedb_reads.py => lance/test_lance_reads.py} (88%) rename tests/io/{lancedb/test_lancedb_scalar_index.py => lance/test_lance_scalar_index.py} (99%) rename tests/io/{lancedb/test_lancedb_vector_search.py => lance/test_lance_vector_search.py} (100%) rename tests/io/{lancedb/test_lancedb_writes.py => lance/test_lance_writes.py} (96%) rename tests/io/{lancedb => lance}/test_mem_wal_writes.py (100%) diff --git a/daft_lance/_lance.py b/daft_lance/_lance.py index 21b7c67..18c291a 100644 --- a/daft_lance/_lance.py +++ b/daft_lance/_lance.py @@ -17,7 +17,7 @@ from .lance_compaction import compact_files_internal from .lance_merge_column import merge_columns_from_df, merge_columns_internal from .lance_scalar_index import create_scalar_index_internal -from .lance_scan import LanceDBScanOperator +from .lance_scan import LanceScanOperator from .utils import construct_lance_dataset if TYPE_CHECKING: @@ -43,12 +43,12 @@ def read_lance( include_fragment_id: bool | None = None, checkpoint: CheckpointConfig | None = None, ) -> DataFrame: - """Create a DataFrame from a LanceDB table. + """Create a DataFrame from a Lance dataset. Args: uri: The URI of the Lance table to read from. Accepts a local path or an object-store URI like "s3://bucket/path". - io_config: A custom IOConfig to use when accessing LanceDB data. Defaults to None. + io_config: A custom IOConfig to use when accessing Lance data. Defaults to None. version : optional, int | str If specified, load a specific version of the Lance dataset. Else, loads the latest version. A version number (`int`) or a tag (`str`) can be provided. @@ -99,25 +99,25 @@ def read_lance( already exists in the store are skipped on re-run. Requires the Ray runner. Returns: - DataFrame: a DataFrame with the schema converted from the specified LanceDB table + DataFrame: a DataFrame with the schema converted from the specified Lance dataset - This function requires the use of [LanceDB](https://lancedb.github.io/lancedb/), which is the Python library for the LanceDB project. + This function reads Lance datasets via the Lance Python package. To ensure that this is installed with Daft, you may install: `pip install daft[lance]` Examples: - Read a local LanceDB table: + Read a local Lance dataset: >>> df = daft.read_lance("/path/to/lance/data/") >>> df.show() - Read a LanceDB table and specify a version: + Read a Lance dataset and specify a version: >>> df = daft.read_lance("/path/to/lance/data/", version=1) >>> df.show() - Read a LanceDB table with fragment grouping: + Read a Lance dataset with fragment grouping: >>> df = daft.read_lance("/path/to/lance/data/", fragment_group_size=5) >>> df.show() - Read a LanceDB table from a public S3 bucket: + Read a Lance dataset from a public S3 bucket: >>> from daft.io import S3Config, IOConfig >>> io_config = IOConfig(s3=S3Config(region="us-west-2", anonymous=True)) >>> df = daft.read_lance("s3://daft-oss-public-data/lance/words-test-dataset", io_config=io_config) @@ -147,7 +147,7 @@ def read_lance( metadata_cache_size_bytes=metadata_cache_size_bytes, ) - lance_operator = LanceDBScanOperator( + lance_operator = LanceScanOperator( ds, fragment_group_size=fragment_group_size, include_fragment_id=include_fragment_id, @@ -178,14 +178,14 @@ def merge_columns( default_scan_options: dict[str, Any] | None = None, metadata_cache_size_bytes: int | None = None, ) -> LanceDataset: - """Merge new columns into a LanceDB table using a transformation function. + """Merge new columns into a Lance dataset using a transformation function. - This function modifies the LanceDB table in-place by adding new columns computed + This function modifies the Lance dataset in-place by adding new columns computed from existing data using a transformation function. It does not return a DataFrame. Args: uri: The URI of the Lance table (supports remote URLs to object stores such as `s3://` or `gs://`) - io_config: A custom IOConfig to use when accessing LanceDB data. Defaults to None. + io_config: A custom IOConfig to use when accessing Lance data. Defaults to None. transform: A transformation function or UDF to apply to the data. read_columns: List of column names to read for the transformation. reader_schema: Schema for the reader. @@ -204,17 +204,17 @@ def merge_columns( None: This function modifies the table in-place and does not return a value. Note: - This function requires the use of [LanceDB](https://lancedb.github.io/lancedb/), which is the Python library for the LanceDB project. + This function writes Lance datasets via the Lance Python package. To ensure that this is installed with Daft, you may install: `pip install daft[lance]` Examples: - Merge new columns into a local LanceDB table: + Merge new columns into a local Lance dataset: >>> def double_score(batch): ... # Example transformation function ... import pyarrow.compute as pc ... ... return batch.append_column("new_column", pc.multiply(batch["c"], 2)) - >>> daft_lance.merge_columns("s3://my-lancedb-bucket/data/", transform=double_score) + >>> daft_lance.merge_columns("s3://my-lance-bucket/data/", transform=double_score) """ if transform is None: raise ValueError( @@ -273,13 +273,13 @@ def merge_columns_df( ) -> Any: """Row-level merge columns entrypoint using a DataFrame. - This function modifies the LanceDB table in-place by merging new columns from a DataFrame + This function modifies the Lance dataset in-place by merging new columns from a DataFrame into existing fragments using a row-level join. It does not return a DataFrame. Args: df: DataFrame containing the new columns to merge along with fragment_id and join key columns - uri: URL to the LanceDB table (supports remote URLs to object stores such as `s3://` or `gs://`) - io_config: A custom IOConfig to use when accessing LanceDB data. Defaults to None. + uri: URL to the Lance dataset (supports remote URLs to object stores such as `s3://` or `gs://`) + io_config: A custom IOConfig to use when accessing Lance data. Defaults to None. read_columns: List of column names to read for the transformation. reader_schema: Schema for the reader. storage_options: Extra options for storage connection. @@ -300,22 +300,22 @@ def merge_columns_df( None: This function modifies the table in-place and does not return a value. Note: - This function requires the use of [LanceDB](https://lancedb.github.io/lancedb/), which is the Python library for the LanceDB project. + This function writes Lance datasets via the Lance Python package. To ensure that this is installed with Daft, you may install: `pip install daft[lance]` Examples: - Merge new columns into a local LanceDB table: + Merge new columns into a local Lance dataset: >>> import daft >>> # Read the existing table with row addresses >>> df = daft.read_lance( - ... "s3://my-lancedb-bucket/data/", + ... "s3://my-lance-bucket/data/", ... default_scan_options={"with_row_address": True}, ... include_fragment_id=True, ... ) >>> # Add new columns based on existing data >>> df = df.with_column("doubled_c", df["c"] * 2) >>> # Merge the new columns back to the table - >>> daft_lance.merge_columns_df(df, "s3://my-lancedb-bucket/data/") + >>> daft_lance.merge_columns_df(df, "s3://my-lance-bucket/data/") """ io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config storage_options = storage_options or io_config_to_storage_options(io_config, uri) @@ -385,7 +385,7 @@ def create_scalar_index( Args: uri: The URI of the Lance table (supports remote URLs to object stores such as `s3://` or `gs://`) - io_config: A custom IOConfig to use when accessing LanceDB data. Defaults to None. + io_config: A custom IOConfig to use when accessing Lance data. Defaults to None. column: Column name to index index_type: Type of index to build. For distributed execution this supports "INVERTED", "FTS", and "BTREE". @@ -424,7 +424,7 @@ def create_scalar_index( ImportError: If lance package is not available Note: - This function requires the use of [LanceDB](https://lancedb.github.io/lancedb/), which is the Python library for the LanceDB project. + This function writes Lance datasets via the Lance Python package. To ensure that this is installed with Daft, you may install: `pip install daft[lance]` Examples: @@ -510,7 +510,7 @@ def compact_files( Args: uri: The URI of the Lance table (supports remote URLs to object stores such as `s3://` or `gs://`) - io_config: A custom IOConfig to use when accessing LanceDB data. Defaults to None. + io_config: A custom IOConfig to use when accessing Lance data. Defaults to None. storage_options: Extra options for storage connection. version: If specified, load a specific version of the Lance dataset. asof: If specified, find the latest version created on or earlier than the given argument value. diff --git a/daft_lance/lance_scan.py b/daft_lance/lance_scan.py index de3349f..dbbc98d 100644 --- a/daft_lance/lance_scan.py +++ b/daft_lance/lance_scan.py @@ -23,7 +23,7 @@ # TODO support fts and fast_search -def _lancedb_table_factory_function( +def _lance_table_factory_function( ds_uri: str, open_kwargs: dict[Any, Any] | None = None, fragment_ids: list[int] | None = None, @@ -110,13 +110,13 @@ def _batches() -> Iterator[PyRecordBatch]: return _iter_batches() -def _lancedb_count_result_function( +def _lance_count_result_function( ds_uri: str, open_kwargs: dict[Any, Any] | None, required_column: str, filter: pa.compute.Expression | None = None, ) -> Iterator[PyRecordBatch]: - """Use LanceDB's API to count rows and return a record batch with the count result.""" + """Use Lance's API to count rows and return a record batch with the count result.""" ds = lance.dataset(ds_uri, **(open_kwargs or {})) logger.debug("Using metadata for counting all rows") count = ds.count_rows(filter=filter) @@ -128,7 +128,7 @@ def _lancedb_count_result_function( yield result_batch._recordbatch -class LanceDBScanOperator(ScanOperator, SupportsPushdownFilters): +class LanceScanOperator(ScanOperator, SupportsPushdownFilters): def __init__( self, ds: lance.LanceDataset, @@ -147,10 +147,10 @@ def __init__( self._schema = convert_lance_schema(base) def name(self) -> str: - return "LanceDBScanOperator" + return "LanceScanOperator" def display_name(self) -> str: - return f"LanceDBScanOperator({self._ds.uri})" + return f"LanceScanOperator({self._ds.uri})" def schema(self) -> Schema: return self._schema @@ -254,8 +254,8 @@ def _create_count_rows_scan_task(self, pushdowns: PyPushdowns) -> Iterator[ScanT new_schema = Schema.from_pyarrow_schema(pa.schema([pa.field(fields[0], pa.uint64())])) open_kwargs = getattr(self._ds, "_lance_open_kwargs", None) yield ScanTask.python_factory_func_scan_task( - module=_lancedb_count_result_function.__module__, - func_name=_lancedb_count_result_function.__name__, + module=_lance_count_result_function.__module__, + func_name=_lance_count_result_function.__name__, func_args=(self._ds.uri, open_kwargs, fields[0], self._combine_filters_to_arrow()), schema=new_schema._schema, num_rows=1, @@ -293,8 +293,8 @@ def _create_scan_tasks_with_limit_and_no_filters( task_schema = self._schema yield ScanTask.python_factory_func_scan_task( - module=_lancedb_table_factory_function.__module__, - func_name=_lancedb_table_factory_function.__name__, + module=_lance_table_factory_function.__module__, + func_name=_lance_table_factory_function.__name__, func_args=( self._ds.uri, open_kwargs, @@ -328,8 +328,8 @@ def _python_factory_func_scan_task( size_bytes: int | None = None, ) -> ScanTask: return ScanTask.python_factory_func_scan_task( - module=_lancedb_table_factory_function.__module__, - func_name=_lancedb_table_factory_function.__name__, + module=_lance_table_factory_function.__module__, + func_name=_lance_table_factory_function.__name__, func_args=( self._ds.uri, open_kwargs, diff --git a/tests/io/lancedb/__init__.py b/tests/io/lance/__init__.py similarity index 100% rename from tests/io/lancedb/__init__.py rename to tests/io/lance/__init__.py diff --git a/tests/io/lancedb/conftest.py b/tests/io/lance/conftest.py similarity index 100% rename from tests/io/lancedb/conftest.py rename to tests/io/lance/conftest.py diff --git a/tests/io/lancedb/test_blob_v2_policy.py b/tests/io/lance/test_blob_v2_policy.py similarity index 100% rename from tests/io/lancedb/test_blob_v2_policy.py rename to tests/io/lance/test_blob_v2_policy.py diff --git a/tests/io/lancedb/test_expression_point_lookup.py b/tests/io/lance/test_expression_point_lookup.py similarity index 100% rename from tests/io/lancedb/test_expression_point_lookup.py rename to tests/io/lance/test_expression_point_lookup.py diff --git a/tests/io/lancedb/test_fast_path_merge.py b/tests/io/lance/test_fast_path_merge.py similarity index 100% rename from tests/io/lancedb/test_fast_path_merge.py rename to tests/io/lance/test_fast_path_merge.py diff --git a/tests/io/lancedb/test_lance_batching.py b/tests/io/lance/test_lance_batching.py similarity index 100% rename from tests/io/lancedb/test_lance_batching.py rename to tests/io/lance/test_lance_batching.py diff --git a/tests/io/lancedb/test_lance_blob_v2_e2e.py b/tests/io/lance/test_lance_blob_v2_e2e.py similarity index 100% rename from tests/io/lancedb/test_lance_blob_v2_e2e.py rename to tests/io/lance/test_lance_blob_v2_e2e.py diff --git a/tests/io/lancedb/test_lance_blob_v2_read.py b/tests/io/lance/test_lance_blob_v2_read.py similarity index 100% rename from tests/io/lancedb/test_lance_blob_v2_read.py rename to tests/io/lance/test_lance_blob_v2_read.py diff --git a/tests/io/lancedb/test_lance_blob_v2_write.py b/tests/io/lance/test_lance_blob_v2_write.py similarity index 100% rename from tests/io/lancedb/test_lance_blob_v2_write.py rename to tests/io/lance/test_lance_blob_v2_write.py diff --git a/tests/io/lancedb/test_lancedb_compaction.py b/tests/io/lance/test_lance_compaction.py similarity index 100% rename from tests/io/lancedb/test_lancedb_compaction.py rename to tests/io/lance/test_lance_compaction.py diff --git a/tests/io/lancedb/test_lancedb_count_pushdown_coverage.py b/tests/io/lance/test_lance_count_pushdown_coverage.py similarity index 88% rename from tests/io/lancedb/test_lancedb_count_pushdown_coverage.py rename to tests/io/lance/test_lance_count_pushdown_coverage.py index 076a7b4..779b2e9 100644 --- a/tests/io/lancedb/test_lancedb_count_pushdown_coverage.py +++ b/tests/io/lance/test_lance_count_pushdown_coverage.py @@ -11,7 +11,7 @@ from daft import col from daft.daft import CountMode from daft.recordbatch import RecordBatch -from daft_lance.lance_scan import LanceDBScanOperator, _lancedb_count_result_function +from daft_lance.lance_scan import LanceScanOperator, _lance_count_result_function class TestLanceCountResultFunction: @@ -30,20 +30,20 @@ def test_dataset_path(self, tmp_path_factory): lance.write_dataset(pa.Table.from_pydict(test_data), tmp_dir) yield str(tmp_dir) - def test_lancedb_count_no_filters_direct_call(self, test_dataset_path): + def test_lance_count_no_filters_direct_call(self, test_dataset_path): """Test that no filters list is handled correctly.""" ds = lance.dataset(test_dataset_path) - result_generator = _lancedb_count_result_function(ds.uri, None, "count") + result_generator = _lance_count_result_function(ds.uri, None, "count") result_batch = next(result_generator) record_batch = RecordBatch._from_pyrecordbatch(result_batch) result_dict = record_batch.to_pydict() assert result_dict["count"][0] == 6 - def test_lancedb_count_with_filters_path(self, test_dataset_path): + def test_lance_count_with_filters_path(self, test_dataset_path): """Test that filters list is handled correctly.""" ds = lance.dataset(test_dataset_path) filter_expr = pc.greater(pc.field("age"), pc.scalar(30)) - result_generator = _lancedb_count_result_function(ds.uri, None, "count", filter=filter_expr) + result_generator = _lance_count_result_function(ds.uri, None, "count", filter=filter_expr) result_batch = next(result_generator) record_batch = RecordBatch._from_pyrecordbatch(result_batch) result_dict = record_batch.to_pydict() @@ -52,7 +52,7 @@ def test_lancedb_count_with_filters_path(self, test_dataset_path): def test_unsupported_count_mode_fallback(self, test_dataset_path): """Test that unsupported count mode falls back to regular scan.""" ds = lance.dataset(test_dataset_path) - scan_op = LanceDBScanOperator(ds) + scan_op = LanceScanOperator(ds) with patch.object(scan_op, "supported_count_modes", return_value=[CountMode.All]): with patch("daft_lance.lance_scan.logger") as mock_logger: @@ -77,7 +77,7 @@ def test_unsupported_count_mode_fallback(self, test_dataset_path): def test_empty_filters_list_handling(self, test_dataset_path): """Test that empty filters list is handled correctly.""" ds = lance.dataset(test_dataset_path) - scan_op = LanceDBScanOperator(ds) + scan_op = LanceScanOperator(ds) pushed, remaining = scan_op.push_filters([]) assert len(pushed) == 0 diff --git a/tests/io/lancedb/test_lancedb_custom_task.py b/tests/io/lance/test_lance_custom_task.py similarity index 100% rename from tests/io/lancedb/test_lancedb_custom_task.py rename to tests/io/lance/test_lance_custom_task.py diff --git a/tests/io/lancedb/test_lance_data_sink_internals.py b/tests/io/lance/test_lance_data_sink_internals.py similarity index 100% rename from tests/io/lancedb/test_lance_data_sink_internals.py rename to tests/io/lance/test_lance_data_sink_internals.py diff --git a/tests/io/lancedb/test_lance_data_sink_storage_versions.py b/tests/io/lance/test_lance_data_sink_storage_versions.py similarity index 100% rename from tests/io/lancedb/test_lance_data_sink_storage_versions.py rename to tests/io/lance/test_lance_data_sink_storage_versions.py diff --git a/tests/io/lancedb/test_lancedb_factory_function.py b/tests/io/lance/test_lance_factory_function.py similarity index 95% rename from tests/io/lancedb/test_lancedb_factory_function.py rename to tests/io/lance/test_lance_factory_function.py index a02f8a8..c68ca80 100644 --- a/tests/io/lancedb/test_lancedb_factory_function.py +++ b/tests/io/lance/test_lance_factory_function.py @@ -7,7 +7,7 @@ import pytest from daft.recordbatch import RecordBatch -from daft_lance.lance_scan import _lancedb_table_factory_function +from daft_lance.lance_scan import _lance_table_factory_function # Import-or-skip lance once at module level so individual tests don't need to do this lance = pytest.importorskip("lance") @@ -57,7 +57,7 @@ def test_reconstructs_dataset_and_reads_fragments( # Collect records from factory out_batches = list( - _lancedb_table_factory_function( + _lance_table_factory_function( ds_uri=ds_direct.uri, open_kwargs=open_kwargs, fragment_ids=frag_ids, @@ -89,7 +89,7 @@ def test_raises_when_no_fragments(tmp_path_factory): with pytest.raises(RuntimeError) as ei: list( - _lancedb_table_factory_function( + _lance_table_factory_function( ds_uri=ds.uri, open_kwargs=None, fragment_ids=[], @@ -110,7 +110,7 @@ def test_invalid_fragment_id_raises(tmp_path_factory): with pytest.raises(Exception) as ei: list( - _lancedb_table_factory_function( + _lance_table_factory_function( ds_uri=ds.uri, open_kwargs=None, fragment_ids=[invalid_id], @@ -141,7 +141,7 @@ def test_open_kwargs_version_selects_correct_version(tmp_path_factory): frag_ids_v1 = [f.fragment_id for f in ds_v1.get_fragments()] out_batches = list( - _lancedb_table_factory_function( + _lance_table_factory_function( ds_uri=ds_v2.uri, open_kwargs={"version": 1}, fragment_ids=frag_ids_v1, diff --git a/tests/io/lancedb/test_lance_merge_evolution.py b/tests/io/lance/test_lance_merge_evolution.py similarity index 100% rename from tests/io/lancedb/test_lance_merge_evolution.py rename to tests/io/lance/test_lance_merge_evolution.py diff --git a/tests/io/lancedb/test_lancedb_point_lookup.py b/tests/io/lance/test_lance_point_lookup.py similarity index 97% rename from tests/io/lancedb/test_lancedb_point_lookup.py rename to tests/io/lance/test_lance_point_lookup.py index 92b07ff..782add2 100644 --- a/tests/io/lancedb/test_lancedb_point_lookup.py +++ b/tests/io/lance/test_lance_point_lookup.py @@ -20,7 +20,7 @@ def lance_dataset(tmp_path_factory): def _scan(ds): - return lance_scan.LanceDBScanOperator(ds) + return lance_scan.LanceScanOperator(ds) @pytest.mark.parametrize( @@ -102,7 +102,7 @@ def test_scanner_without_fragments(lance_dataset, idx_type): arrow_filter = Expression._from_pyexpr((col("id") == 2)._expr).to_arrow_expr() # Invoke factory with fragment_ids=None to exercise index-driven fragment selection - gen = lance_scan._lancedb_table_factory_function( + gen = lance_scan._lance_table_factory_function( ds.uri, getattr(ds, "_lance_open_kwargs", None), None, diff --git a/tests/io/lancedb/test_lancedb_reads.py b/tests/io/lance/test_lance_reads.py similarity index 88% rename from tests/io/lancedb/test_lancedb_reads.py rename to tests/io/lance/test_lance_reads.py index ca5cb7b..2e73c5e 100644 --- a/tests/io/lancedb/test_lancedb_reads.py +++ b/tests/io/lance/test_lance_reads.py @@ -18,18 +18,18 @@ def lance_dataset_path(tmp_path_factory): yield str(tmp_dir) -def test_lancedb_read(lance_dataset_path): +def test_lance_read(lance_dataset_path): df = daft.read_lance(lance_dataset_path) assert df.to_pydict() == data -def test_lancedb_read_column_selection(lance_dataset_path): +def test_lance_read_column_selection(lance_dataset_path): df = daft.read_lance(lance_dataset_path) df = df.select("vector") assert df.to_pydict() == {"vector": data["vector"]} -def test_lancedb_read_filter(lance_dataset_path): +def test_lance_read_filter(lance_dataset_path): df = daft.read_lance(lance_dataset_path) df = df.where((df["lat"] > 45) & (df["lat"] < 90)) df = df.select("vector") @@ -68,7 +68,7 @@ def large_lance_dataset_path(tmp_path_factory): (10000, 10), ], ) -def test_lancedb_read_limit_large_dataset(large_lance_dataset_path, limit_size, expected_scan_tasks): +def test_lance_read_limit_large_dataset(large_lance_dataset_path, limit_size, expected_scan_tasks): """Test limit operation on a large Lance dataset with multiple fragments.""" import io @@ -97,12 +97,12 @@ def test_lancedb_read_limit_large_dataset(large_lance_dataset_path, limit_size, assert result["big_int"] == expected_big_ints -def test_lancedb_with_version(lance_dataset_path): +def test_lance_with_version(lance_dataset_path): df = daft.read_lance(uri=lance_dataset_path, version=1) assert df.to_pydict() == data # test pushdown filters with limit and projection - def test_lancedb_read_pushdown(lance_dataset_path, capsys): + def test_lance_read_pushdown(lance_dataset_path, capsys): df = daft.read_lance(lance_dataset_path) df = daft.sql("SELECT vector, lat + 1 as lat_plus_1 FROM df where long < 3 limit 1") df.explain(show_all=True) @@ -139,7 +139,7 @@ def test_lancedb_read_pushdown(lance_dataset_path, capsys): ) -def test_lancedb_read_parallelism_fragment_merging(large_lance_dataset_path): +def test_lance_read_parallelism_fragment_merging(large_lance_dataset_path): """Test parallelism parameter reduces scan tasks by merging fragments.""" df = daft.read_lance(uri=large_lance_dataset_path, fragment_group_size=3) result = df.to_pydict() @@ -147,7 +147,7 @@ def test_lancedb_read_parallelism_fragment_merging(large_lance_dataset_path): assert len(result["big_int"]) == 10000 -def test_lancedb_read_filter_passthrough(tmp_path): +def test_lance_read_filter_passthrough(tmp_path): """Test passing raw SQL filter string to Lance via default_scan_options.""" import lance @@ -164,8 +164,8 @@ def test_lancedb_read_filter_passthrough(tmp_path): assert sorted(res["id"]) == [1, 2] -def test_lancedb_geo_projection_and_filter(tmp_path): - """Test LanceDB read with Geo projection and filter via default_scan_options.""" +def test_lance_geo_projection_and_filter(tmp_path): + """Test Lance read with Geo projection and filter via default_scan_options.""" import lance try: @@ -250,7 +250,7 @@ def test_lancedb_geo_projection_and_filter(tmp_path): assert abs(res["distance"][0]) < 1e-6 -class TestLanceDBCountPushdown: +class TestLanceCountPushdown: tmp_data = { "a": ["a", "b", "c", "d", "e", None], "b": [1, None, 3, None, 5, 6], @@ -271,7 +271,7 @@ def test_count_all_pushdown(self, dataset_path, capsys): df.explain(True) actual = capsys.readouterr() assert "Pushdowns: {projection: [b], aggregation: count(col(b), All)}" in actual.out - assert "_lancedb_count_result_function" in actual.out # Accept daft or daft_lance module path + assert "_lance_count_result_function" in actual.out # Accept daft or daft_lance module path result = df.to_pydict() assert result == {"count": [6]} @@ -284,7 +284,7 @@ def test_count_column_no_pushdown(self, dataset_path, capsys): df.explain(True) actual = capsys.readouterr() assert "Pushdowns: {projection: [a], aggregation: count(col(a), All)}" not in actual.out - assert "_lancedb_count_result_function" not in actual.out # Accept daft or daft_lance module path + assert "_lance_count_result_function" not in actual.out # Accept daft or daft_lance module path result = df.to_pydict() assert result == {"a": [5]} @@ -297,7 +297,7 @@ def test_count_pushdown_with_select(self, dataset_path, capsys): df.explain(True) actual = capsys.readouterr() assert "Pushdowns: {projection: [b], aggregation: count(col(b), All)}" in actual.out - assert "_lancedb_count_result_function" in actual.out # Accept daft or daft_lance module path + assert "_lance_count_result_function" in actual.out # Accept daft or daft_lance module path result = df.to_pydict() assert result == {"count": [6]} @@ -310,7 +310,7 @@ def test_count_with_filter_pushdown(self, dataset_path, capsys): _ = capsys.readouterr() df.explain(True) actual = capsys.readouterr() - assert "_lancedb_count_result_function" in actual.out # Accept daft or daft_lance module path + assert "_lance_count_result_function" in actual.out # Accept daft or daft_lance module path assert "Filter pushdown = is_null(col(b))" in actual.out assert "Aggregation pushdown = count(col(b), All)" in actual.out @@ -325,7 +325,7 @@ def test_count_with_or_filter_pushdown(self, dataset_path, capsys): _ = capsys.readouterr() df.explain(True) actual = capsys.readouterr() - assert "_lancedb_count_result_function" in actual.out # Accept daft or daft_lance module path + assert "_lance_count_result_function" in actual.out # Accept daft or daft_lance module path assert "Filter pushdown = is_null(col(b)) | is_null(col(c))" in actual.out assert "Aggregation pushdown = count(col(b), All)" in actual.out @@ -341,7 +341,7 @@ def test_count_with_and_filter_pushdown(self, dataset_path, capsys): _ = capsys.readouterr() df.explain(True) actual = capsys.readouterr() - assert "_lancedb_count_result_function" in actual.out # Accept daft or daft_lance module path + assert "_lance_count_result_function" in actual.out # Accept daft or daft_lance module path assert "Aggregation pushdown" in actual.out assert "Filter pushdown" in actual.out @@ -356,7 +356,7 @@ def test_count_with_filter_and_select_pushdown(self, dataset_path, capsys): _ = capsys.readouterr() df.explain(True) actual = capsys.readouterr() - assert "_lancedb_count_result_function" in actual.out # Accept daft or daft_lance module path + assert "_lance_count_result_function" in actual.out # Accept daft or daft_lance module path result = df.to_pydict() assert result == {"count": [4]} @@ -373,7 +373,7 @@ def test_edge_case_empty_dataset(self, tmp_path_factory, capsys): df.explain(True) actual = capsys.readouterr() assert "Pushdowns: {projection: [a], aggregation: count(col(a), All)}" in actual.out - assert "_lancedb_count_result_function" in actual.out # Accept daft or daft_lance module path + assert "_lance_count_result_function" in actual.out # Accept daft or daft_lance module path result = df.to_pydict() assert result == {"count": [0]} @@ -386,14 +386,14 @@ def test_count_1(self, dataset_path, capsys): df.explain(True) actual = capsys.readouterr() assert "Pushdowns: {projection: [b], aggregation: count(col(b), All)}" in actual.out - assert "_lancedb_count_result_function" in actual.out # Accept daft or daft_lance module path + assert "_lance_count_result_function" in actual.out # Accept daft or daft_lance module path result = df.to_pydict() assert result == {"count": [6]} @pytest.mark.parametrize("enable_strict_filter_pushdown", [True, False]) -def test_lancedb_filter_then_limit_behavior(lance_dataset_path, enable_strict_filter_pushdown): +def test_lance_filter_then_limit_behavior(lance_dataset_path, enable_strict_filter_pushdown): """Ensure filter is applied before limit for Lance reads.""" daft.context.set_planning_config(enable_strict_filter_pushdown=enable_strict_filter_pushdown) df = daft.read_lance(lance_dataset_path) @@ -408,7 +408,7 @@ def test_lancedb_filter_then_limit_behavior(lance_dataset_path, enable_strict_fi assert result3 == {"vector": [[0.2, 1.8]], "lat": [40.1], "long": [-74.1], "big_int": [2]} -def test_lancedb_limit_with_filter_and_fragment_grouping_single_task(large_lance_dataset_path): +def test_lance_limit_with_filter_and_fragment_grouping_single_task(large_lance_dataset_path): """Validate filter+limit correctness when fragment grouping is enabled.""" df = daft.read_lance(uri=large_lance_dataset_path, fragment_group_size=4) df = df.filter("big_int = 999").limit(1).select("big_int") diff --git a/tests/io/lancedb/test_lancedb_scalar_index.py b/tests/io/lance/test_lance_scalar_index.py similarity index 99% rename from tests/io/lancedb/test_lancedb_scalar_index.py rename to tests/io/lance/test_lance_scalar_index.py index ffa011f..dfd3327 100644 --- a/tests/io/lancedb/test_lancedb_scalar_index.py +++ b/tests/io/lance/test_lance_scalar_index.py @@ -399,9 +399,9 @@ def test_build_distributed_index_fts_type(self, multi_fragment_lance_dataset): """Test building distributed FTS (Full-Text Search) index.""" dataset_uri = multi_fragment_lance_dataset - # Skip this test if FTS is not supported in the current LanceDB version - # This test will be enabled when LanceDB version supports FTS index type - pytest.skip("FTS index type may not be supported in the current LanceDB version") + # Skip this test if FTS is not supported in the current Lance version + # This test will be enabled when Lance version supports FTS index type + pytest.skip("FTS index type may not be supported in the current Lance version") # Build distributed FTS index create_scalar_index( diff --git a/tests/io/lancedb/test_lancedb_vector_search.py b/tests/io/lance/test_lance_vector_search.py similarity index 100% rename from tests/io/lancedb/test_lancedb_vector_search.py rename to tests/io/lance/test_lance_vector_search.py diff --git a/tests/io/lancedb/test_lancedb_writes.py b/tests/io/lance/test_lance_writes.py similarity index 96% rename from tests/io/lancedb/test_lancedb_writes.py rename to tests/io/lance/test_lance_writes.py index 82c78b1..0b84016 100644 --- a/tests/io/lancedb/test_lancedb_writes.py +++ b/tests/io/lance/test_lance_writes.py @@ -41,7 +41,7 @@ def minio_create_bucket(minio_io_config, bucket_name): yield -def test_lancedb_roundtrip(lance_dataset_path): +def test_lance_roundtrip(lance_dataset_path): df1 = daft.from_pydict(data1) df2 = daft.from_pydict(data2) df1.write_lance(lance_dataset_path, mode="create") @@ -52,7 +52,7 @@ def test_lancedb_roundtrip(lance_dataset_path): @pytest.mark.integration() -def test_lancedb_minio(minio_io_config): +def test_lance_minio(minio_io_config): df1 = daft.from_pydict(data1) df2 = daft.from_pydict(data2) bucket_name = "lance" @@ -64,7 +64,7 @@ def test_lancedb_minio(minio_io_config): assert df_loaded.to_pydict() == df1.concat(df2).to_pydict() -def test_lancedb_write_with_schema(lance_dataset_path): +def test_lance_write_with_schema(lance_dataset_path): """Writing a dataframe to lance with a user-provided schema with lance encodings.""" data = { "vector": [1.1, 1.2], @@ -99,7 +99,7 @@ def test_lancedb_write_with_schema(lance_dataset_path): assert compress_field_metadata[b"lance-encoding:compression"] == b"zstd" -def test_lancedb_write_blob(lance_dataset_path): +def test_lance_write_blob(lance_dataset_path): schema = pa.schema( [ pa.field("blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"}), @@ -130,7 +130,7 @@ def test_lancedb_write_blob(lance_dataset_path): assert f.read() == expected -def test_lancedb_write_string(lance_dataset_path): +def test_lance_write_string(lance_dataset_path): import lance # Make lance dataset with a string column @@ -149,7 +149,7 @@ def test_lancedb_write_string(lance_dataset_path): assert df_loaded.to_pydict() == data -def test_lancedb_write_incompatible_schema(lance_dataset_path): +def test_lance_write_incompatible_schema(lance_dataset_path): import lance # Make lance dataset with an int and a string column @@ -166,7 +166,7 @@ def test_lancedb_write_incompatible_schema(lance_dataset_path): df.write_lance(lance_dataset_path, mode="append") -def test_lancedb_write_with_create_append_mode(lance_dataset_path): +def test_lance_write_with_create_append_mode(lance_dataset_path): import lance # Make lance dataset with a string column diff --git a/tests/io/lancedb/test_mem_wal_writes.py b/tests/io/lance/test_mem_wal_writes.py similarity index 100% rename from tests/io/lancedb/test_mem_wal_writes.py rename to tests/io/lance/test_mem_wal_writes.py From a77d7c106fc38e87a9614a64835ccb735d98de9a Mon Sep 17 00:00:00 2001 From: fanng <“fanng@apache.org”> Date: Thu, 25 Jun 2026 18:46:24 +0800 Subject: [PATCH 2/4] fix: preserve deprecated LanceDB scan names --- daft_lance/lance_scan.py | 47 +++++++++++++++++++ .../test_lance_count_pushdown_coverage.py | 30 +++++++++++- 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/daft_lance/lance_scan.py b/daft_lance/lance_scan.py index dbbc98d..d3617e0 100644 --- a/daft_lance/lance_scan.py +++ b/daft_lance/lance_scan.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import warnings from collections.abc import Iterator from typing import Any @@ -128,6 +129,26 @@ def _lance_count_result_function( yield result_batch._recordbatch +def _lancedb_table_factory_function(*args: Any, **kwargs: Any) -> Iterator[PyRecordBatch]: + warnings.warn( + "_lancedb_table_factory_function is deprecated and will be removed in a future release. " + "Use _lance_table_factory_function instead.", + DeprecationWarning, + stacklevel=2, + ) + return _lance_table_factory_function(*args, **kwargs) + + +def _lancedb_count_result_function(*args: Any, **kwargs: Any) -> Iterator[PyRecordBatch]: + warnings.warn( + "_lancedb_count_result_function is deprecated and will be removed in a future release. " + "Use _lance_count_result_function instead.", + DeprecationWarning, + stacklevel=2, + ) + return _lance_count_result_function(*args, **kwargs) + + class LanceScanOperator(ScanOperator, SupportsPushdownFilters): def __init__( self, @@ -486,3 +507,29 @@ def _estimate_size_bytes(fragment: lance.LanceFragment) -> int: return 0 return sum(file.file_size_bytes for file in fragment.metadata.files if file.file_size_bytes is not None) + + +class LanceDBScanOperator(LanceScanOperator): + def __init__( + self, + ds: lance.LanceDataset, + fragment_group_size: int | None = None, + include_fragment_id: bool | None = False, + ): + warnings.warn( + "LanceDBScanOperator is deprecated and will be removed in a future release. " + "Use LanceScanOperator instead.", + DeprecationWarning, + stacklevel=2, + ) + super().__init__( + ds, + fragment_group_size=fragment_group_size, + include_fragment_id=include_fragment_id, + ) + + def name(self) -> str: + return "LanceDBScanOperator" + + def display_name(self) -> str: + return f"LanceDBScanOperator({self._ds.uri})" diff --git a/tests/io/lance/test_lance_count_pushdown_coverage.py b/tests/io/lance/test_lance_count_pushdown_coverage.py index 779b2e9..c7fcf4e 100644 --- a/tests/io/lance/test_lance_count_pushdown_coverage.py +++ b/tests/io/lance/test_lance_count_pushdown_coverage.py @@ -11,7 +11,12 @@ from daft import col from daft.daft import CountMode from daft.recordbatch import RecordBatch -from daft_lance.lance_scan import LanceScanOperator, _lance_count_result_function +from daft_lance.lance_scan import ( + LanceDBScanOperator, + LanceScanOperator, + _lance_count_result_function, + _lancedb_count_result_function, +) class TestLanceCountResultFunction: @@ -49,6 +54,18 @@ def test_lance_count_with_filters_path(self, test_dataset_path): result_dict = record_batch.to_pydict() assert result_dict["count"][0] == 4 + def test_deprecated_lancedb_count_result_function_alias(self, test_dataset_path): + """Test that the old count helper name remains available.""" + ds = lance.dataset(test_dataset_path) + + with pytest.deprecated_call(match="_lancedb_count_result_function is deprecated"): + result_generator = _lancedb_count_result_function(ds.uri, None, "count") + + result_batch = next(result_generator) + record_batch = RecordBatch._from_pyrecordbatch(result_batch) + result_dict = record_batch.to_pydict() + assert result_dict["count"][0] == 6 + def test_unsupported_count_mode_fallback(self, test_dataset_path): """Test that unsupported count mode falls back to regular scan.""" ds = lance.dataset(test_dataset_path) @@ -84,6 +101,17 @@ def test_empty_filters_list_handling(self, test_dataset_path): assert len(remaining) == 0 assert scan_op._pushed_filters is None + def test_deprecated_lancedb_scan_operator_alias(self, test_dataset_path): + """Test that the old public scan operator name remains available.""" + ds = lance.dataset(test_dataset_path) + + with pytest.deprecated_call(match="LanceDBScanOperator is deprecated"): + scan_op = LanceDBScanOperator(ds) + + assert isinstance(scan_op, LanceScanOperator) + assert scan_op.name() == "LanceDBScanOperator" + assert scan_op.display_name() == f"LanceDBScanOperator({ds.uri})" + def test_very_large_filter_expression(self, test_dataset_path): """Test that very large filter expressions are handled correctly.""" df = daft.read_lance(test_dataset_path) From 8adc23c185180803c48ef654255c733bab0934f2 Mon Sep 17 00:00:00 2001 From: fanng <“fanng@apache.org”> Date: Thu, 25 Jun 2026 23:32:11 +0800 Subject: [PATCH 3/4] fix(fast-path): three bugs in FastPathFragmentWriter and _can_use_fast_path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug 1 — type erasure (FastPathFragmentWriter.__call__): pa.array(s.to_pylist()) drops Arrow type info from daft Series. For fixed_size_list[N], Python floats become float64 and the fixed-size list becomes a variable-length list, which causes lance's stream merger to fail with a row-count mismatch (1 != N). Fix: use s.to_arrow() / combine_chunks() to preserve the declared daft return type. Bug 2 — collect() side-effect (_can_use_fast_path): len(df.collect()) materialises results into df._result_cache. One-shot Python objects (e.g. lance BlobFile) cached there are exhausted; a subsequent groupby().map_groups() that re-uses the same df object receives stale objects and produces null values. Fix: use df.count_rows(), which does not set _result_cache. Bug 3 — next_fid under-counts child field IDs (FastPathFragmentWriter): lance_schema.fields() returns only top-level fields. Nested types (struct, blob-v2, etc.) have child fields with their own IDs. max(f.id() for top-level f) skips these, so next_fid can collide with an existing child field ID. The committed fragment metadata then maps the new column file to the wrong schema field, causing lance to read null for the new column. Fix: recurse into children when computing the max field ID. --- daft_lance/lance_merge_column.py | 36 +++++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/daft_lance/lance_merge_column.py b/daft_lance/lance_merge_column.py index c0e0cb9..afcad7c 100644 --- a/daft_lance/lance_merge_column.py +++ b/daft_lance/lance_merge_column.py @@ -267,10 +267,18 @@ def __call__(self, *cols: Any) -> list[dict[str, bytes]]: rowaddrs = rowaddr_col.to_pylist() if hasattr(rowaddr_col, "to_pylist") else list(rowaddr_col) - # Build table of new columns + # Build table of new columns, preserving the Arrow type from the daft Series. + # pa.array(s.to_pylist()) loses type information: fixed_size_list[N] + # becomes list because Python floats are float64 and list structure is + # inferred from Python lists. Using s.to_arrow() avoids this type erasure. arrays = [] for s in data_cols: - arr = _pa.array(s.to_pylist() if hasattr(s, "to_pylist") else list(s)) + if hasattr(s, "to_arrow"): + arr = s.to_arrow() + if isinstance(arr, _pa.ChunkedArray): + arr = arr.combine_chunks() + else: + arr = _pa.array(list(s)) arrays.append(arr) tbl = _pa.table({name: arr for name, arr in zip(self.new_column_names, arrays)}) @@ -306,8 +314,22 @@ def __call__(self, *cols: Any) -> list[dict[str, bytes]]: writer.write_batch(b) file_size = os.path.getsize(filepath) - # Determine field IDs for the new columns - next_fid = max(f.id() for f in self.lance_ds.lance_schema.fields()) + 1 + # Determine field IDs for the new columns. + # lance_schema.fields() returns only top-level fields; child fields of + # nested types (struct, fixed_size_list, …) have their own IDs and must + # be included in the max-scan, otherwise next_fid collides with an + # existing child field ID and the committed fragment metadata will map + # the new file to the wrong schema field. + def _max_field_id(fields: Any) -> int: + best = -1 + for f in fields: + best = max(best, f.id()) + children = f.children() if callable(getattr(f, "children", None)) else [] + if children: + best = max(best, _max_field_id(children)) + return best + + next_fid = _max_field_id(self.lance_ds.lance_schema.fields()) + 1 # Stitch new data file into fragment metadata new_file_entry = { @@ -342,7 +364,11 @@ def _can_use_fast_path( return False if "fragment_id" not in df.column_names: return False - df_row_count = len(df.collect()) + # Use count_rows() rather than collect() to avoid caching one-shot Python + # objects (e.g. BlobFile) into df._result_cache. collect() caches its result + # so a subsequent groupby().map_groups() would receive the same exhausted + # Python objects instead of fresh ones. + df_row_count = df.count_rows() ds_row_count = lance_ds.count_rows() return df_row_count == ds_row_count From 24a7f1b83dc1b6ba992fb3f5cba25ddbd8769096 Mon Sep 17 00:00:00 2001 From: fanng <“fanng@apache.org”> Date: Fri, 26 Jun 2026 07:37:46 +0800 Subject: [PATCH 4/4] test(fast-path): add regression tests for three FastPathFragmentWriter bugs Covers: - fixed_size_list[N] type erasure via pa.array(s.to_pylist()) - next_fid collision with nested struct child field IDs - BlobFile exhaustion caused by collect() side-effect in _can_use_fast_path --- tests/io/lance/test_fast_path_merge.py | 121 +++++++++++++++++++++++++ 1 file changed, 121 insertions(+) diff --git a/tests/io/lance/test_fast_path_merge.py b/tests/io/lance/test_fast_path_merge.py index cde4235..74a2e5e 100644 --- a/tests/io/lance/test_fast_path_merge.py +++ b/tests/io/lance/test_fast_path_merge.py @@ -587,3 +587,124 @@ def test_scan_fragments_individually(self, ds_path): combined = sorted(zip(all_ids, all_doubled)) assert combined == [(1, 2), (2, 4), (3, 6), (4, 8)] + + +# --------------------------------------------------------------------------- +# 10. Regressions +# --------------------------------------------------------------------------- + + +class TestRegressions: + def test_fixed_size_list_float32_type_preserved(self, ds_path): + """Bug: pa.array(s.to_pylist()) erases fixed_size_list[N] → list. + + Python floats are float64 and list structure is inferred from nested Python lists, + so the Arrow type is lost. The fix uses s.to_arrow().combine_chunks() which + preserves the declared daft return type exactly. + """ + N = 8 + ds = create_dataset(ds_path, [{"id": [1, 2, 3]}]) + df = read_with_metadata(ds_path) + + @daft.func.batch(return_dtype=daft.DataType.fixed_size_list(daft.DataType.float32(), N)) + def _make_vec(ids): + import numpy as np + + return [np.array([float(i)] * N, dtype=np.float32) for i in ids.to_pylist()] + + df = df.with_column("embedding", _make_vec(daft.col("id"))) + ds2 = merge_columns_from_df(df, ds, ds_path) + + field = ds2.schema.field("embedding") + # Type must be preserved: fixed_size_list[N], NOT list + assert pa.types.is_fixed_size_list(field.type), f"Expected fixed_size_list, got {field.type}" + assert field.type.list_size == N + assert field.type.value_type == pa.float32(), f"Expected float32 values, got {field.type.value_type}" + + result = ds2.to_table().sort_by("id").to_pydict() + for i, emb in zip(result["id"], result["embedding"]): + assert emb is not None, f"Embedding for id={i} is null" + assert len(emb) == N + for v in emb: + assert pytest.approx(float(i), rel=1e-5) == v + + def test_next_fid_skips_nested_child_field_ids(self, ds_path): + """Bug: next_fid only scanned top-level lance_schema.fields(), missing child IDs. + + A struct column with M children occupies M+1 field IDs (1 for parent + M for children). + With top-level-only scan: max_id = num_top_level_fields - 1, next_fid collides with a + child field ID, and the new file's fragment metadata maps to the wrong schema field → + the new column reads back as null. + + The fix uses recursive _max_field_id() that includes all descendants. + """ + # Schema: id(fid=0), meta struct(fid=1) with children meta.a(fid=2), meta.b(fid=3). + # Top-level scan → max=1, next_fid=2 (WRONG: collides with meta.a). + # Recursive scan → max=3, next_fid=4 (CORRECT). + struct_type = pa.struct([("a", pa.int64()), ("b", pa.utf8())]) + rows = [{"a": i, "b": f"s{i}"} for i in [1, 2, 3]] + tbl = pa.table( + { + "id": pa.array([1, 2, 3], type=pa.int64()), + "meta": pa.array(rows, type=struct_type), + } + ) + lance.write_dataset(tbl, ds_path) + ds = lance.dataset(ds_path) + + df = read_with_metadata(ds_path) + df = df.with_column("score", daft.col("id").cast(daft.DataType.int64()) * 10) + ds2 = merge_columns_from_df(df, ds, ds_path) + + result = ds2.to_table().sort_by("id").to_pydict() + assert result["score"] == [10, 20, 30], f"score is null or wrong (field ID collision): {result['score']}" + assert result["id"] == [1, 2, 3] + + def test_blob_pipeline_merge_produces_nonnull_results(self, tmp_path_factory): + """Bug: _can_use_fast_path called df.collect(), caching one-shot BlobFile objects. + + Daft caches collect() results in df._result_cache. The subsequent + groupby().map_groups() received the same exhausted BlobFile instances, + whose .read() returned b'', causing downstream UDFs to produce null/wrong output. + + The fix uses df.count_rows() which does NOT set _result_cache, so the pipeline + re-executes fresh and BlobFile objects are materialized again from scratch. + """ + from daft_lance._blob import take_blobs as _take_blobs + + blob_path = str(tmp_path_factory.mktemp("blob_ds")) + payloads = [b"hello", b"world", b"foo"] + tbl = pa.table( + { + "id": pa.array([1, 2, 3], type=pa.int64()), + "blob": lance.blob_array(payloads), + } + ) + lance.write_dataset(tbl, blob_path, data_storage_version="2.2") + ds = lance.dataset(blob_path) + + df = daft.read_lance( + blob_path, + include_fragment_id=True, + default_scan_options={"with_row_id": True, "with_row_address": True}, + ) + df = _take_blobs(df, ds, "blob") + + @daft.func.batch(return_dtype=daft.DataType.int64()) + def _blob_len(blobs): + return [len(b.read()) if b is not None else -1 for b in blobs.to_pylist()] + + df = df.with_column("blob_len", _blob_len(daft.col("blob"))) + ds2 = merge_columns_from_df( + df.select("fragment_id", "_rowaddr", "blob_len"), + ds, + blob_path, + ) + + result = ds2.to_table().sort_by("id").to_pydict() + expected_lens = [len(p) for p in payloads] + # If the old bug (collect() caching) were present, blob_len would be 0 for all rows + # because exhausted BlobFile.read() returns b''. + assert result["blob_len"] == expected_lens, ( + f"blob_len wrong (BlobFile exhaustion bug?): got {result['blob_len']}, expected {expected_lens}" + )