diff --git a/.github/workflows/dask_test.yaml b/.github/workflows/dask_test.yaml index 6c03f465d..a4a25d44d 100644 --- a/.github/workflows/dask_test.yaml +++ b/.github/workflows/dask_test.yaml @@ -47,7 +47,7 @@ jobs: cache-environment-key: environment-${{ steps.date.outputs.date }}-0 - name: Install current main versions of dask - run: python -m pip install git+https://github.com/dask/dask + run: python -m pip install git+https://github.com/phofl/dask@read-csv-legacy - name: Install current main versions of distributed run: python -m pip install git+https://github.com/dask/distributed diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index b8cca6b61..07c8dcc27 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -47,7 +47,7 @@ jobs: cache-environment-key: environment-${{ steps.date.outputs.date }}-1 - name: Install current main versions of dask - run: python -m pip install git+https://github.com/dask/dask + run: python -m pip install git+https://github.com/phofl/dask@read-csv-legacy if: ${{ matrix.environment-file == 'ci/environment.yml' }} - name: Install current main versions of distributed diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index 9ece42f61..db186de42 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -5131,23 +5131,17 @@ def read_csv( path, *args, header="infer", - dtype_backend=None, storage_options=None, **kwargs, ): - from dask_expr.io.csv import ReadCSV + from dask.dataframe.io.csv import read_csv as _read_csv - if not isinstance(path, str): - path = stringify_path(path) - return new_collection( - ReadCSV( - path, - dtype_backend=dtype_backend, - storage_options=storage_options, - kwargs=kwargs, - header=header, - dataframe_backend="pandas", - ) + return _read_csv( + path, + *args, + header=header, + storage_options=storage_options, + **kwargs, ) @@ -5156,23 +5150,18 @@ def read_table( *args, header="infer", usecols=None, - dtype_backend=None, storage_options=None, **kwargs, ): - from dask_expr.io.csv import ReadTable + from dask.dataframe.io.csv import read_table as _read_table - if not isinstance(path, str): - path = stringify_path(path) - return new_collection( - ReadTable( - path, - columns=usecols, - dtype_backend=dtype_backend, - storage_options=storage_options, - kwargs=kwargs, - header=header, - ) + return _read_table( + path, + *args, + header=header, + storage_options=storage_options, + usecols=usecols, + **kwargs, ) @@ -5181,23 +5170,18 @@ def read_fwf( *args, header="infer", usecols=None, - dtype_backend=None, storage_options=None, **kwargs, ): - from dask_expr.io.csv import ReadFwf + from dask.dataframe.io.csv import read_fwf as _read_fwf - if not isinstance(path, str): - path = stringify_path(path) - return new_collection( - ReadFwf( - path, - columns=usecols, - dtype_backend=dtype_backend, - storage_options=storage_options, - kwargs=kwargs, - header=header, - ) + return _read_fwf( + path, + *args, + header=header, + storage_options=storage_options, + usecols=usecols, + **kwargs, ) diff --git a/dask_expr/io/csv.py b/dask_expr/io/csv.py index eef227953..989b01b7b 100644 --- a/dask_expr/io/csv.py +++ b/dask_expr/io/csv.py @@ -1,154 +1,3 @@ -import functools -import operator - -from dask._task_spec import Task -from dask.typing import Key - -from dask_expr._expr import Projection -from dask_expr._util import _convert_to_list -from dask_expr.io.io import BlockwiseIO, PartitionsFiltered - - -class ReadCSV(PartitionsFiltered, BlockwiseIO): - _parameters = [ - "filename", - "columns", - "header", - "dtype_backend", - "_partitions", - "storage_options", - "kwargs", - "_series", - "dataframe_backend", - ] - _defaults = { - "columns": None, - "header": "infer", - "dtype_backend": None, - "kwargs": None, - "_partitions": None, - "storage_options": None, - "_series": False, - "dataframe_backend": "pandas", - } - _absorb_projections = True - - @functools.cached_property - def operation(self): - from dask.dataframe.io import read_csv - - return read_csv - - @functools.cached_property - def _ddf(self): - from dask import config - - # Temporary hack to simplify logic - with config.set({"dataframe.backend": self.dataframe_backend}): - kwargs = ( - {"dtype_backend": self.dtype_backend} - if self.dtype_backend is not None - else {} - ) - if self.kwargs is not None: - kwargs.update(self.kwargs) - - columns = _convert_to_list(self.operand("columns")) - if columns is None: - pass - elif "include_path_column" in self.kwargs: - flag = self.kwargs["include_path_column"] - if flag is True: - column_to_remove = "path" - elif isinstance(flag, str): - column_to_remove = flag - else: - column_to_remove = None - - columns = [c for c in columns if c != column_to_remove] - - if not columns: - meta = self.operation( - self.filename, - header=self.header, - storage_options=self.storage_options, - **kwargs, - )._meta - columns = [list(meta.columns)[0]] - - usecols = kwargs.pop("usecols", None) - if usecols is not None and columns is not None: - columns = [col for col in columns if col in usecols] - elif usecols: - columns = usecols - - return self.operation( - self.filename, - usecols=columns, - header=self.header, - storage_options=self.storage_options, - **kwargs, - ) - - @functools.cached_property - def _meta(self): - return self._ddf._meta - - def _simplify_up(self, parent, dependents): - if isinstance(parent, Projection): - kwargs = self.kwargs - # int usecols are positional, so block projections - if kwargs.get("usecols", None) is not None and isinstance( - kwargs.get("usecols")[0], int - ): - return - return super()._simplify_up(parent, dependents) - - @functools.cached_property - def columns(self): - columns_operand = self.operand("columns") - if columns_operand is None: - try: - return list(self._ddf._meta.columns) - except AttributeError: - return [] - else: - return _convert_to_list(columns_operand) - - def _divisions(self): - return self._ddf.divisions - - @functools.cached_property - def _tasks(self): - from dask._task_spec import convert_legacy_graph - - return list(convert_legacy_graph(self._ddf.dask.to_dict()).values()) - - def _filtered_task(self, name: Key, index: int) -> Task: - if self._series: - return Task(name, operator.getitem, self._tasks[index], self.columns[0]) - t = self._tasks[index] - if t.key != name: - return Task(name, lambda x: x, t) - return t - - -class ReadTable(ReadCSV): - @functools.cached_property - def operation(self): - from dask.dataframe.io import read_table - - return read_table - - -class ReadFwf(ReadCSV): - @functools.cached_property - def operation(self): - from dask.dataframe.io import read_fwf - - return read_fwf - - def to_csv( df, filename, diff --git a/dask_expr/io/tests/test_io.py b/dask_expr/io/tests/test_io.py index 6ed4f6972..58f48de2b 100644 --- a/dask_expr/io/tests/test_io.py +++ b/dask_expr/io/tests/test_io.py @@ -23,7 +23,7 @@ read_parquet, ) from dask_expr._expr import Expr, Replace -from dask_expr.io import FromArray, FromMap, ReadCSV, ReadParquet, parquet +from dask_expr.io import FromArray, FromMap, ReadParquet, parquet from dask_expr.tests._util import _backend_library # Set DataFrame backend for this module @@ -257,7 +257,7 @@ def test_to_dask_array(optimize): @pytest.mark.parametrize( "fmt,read_func,read_cls", - [("parquet", read_parquet, ReadParquet), ("csv", read_csv, ReadCSV)], + [("parquet", read_parquet, ReadParquet), ("csv", read_csv, FromMap)], ) def test_combine_similar(tmpdir, fmt, read_func, read_cls): pdf = pd.DataFrame(