diff --git a/dask_expr/_backends.py b/dask_expr/_backends.py index 299f3eaff..120757377 100644 --- a/dask_expr/_backends.py +++ b/dask_expr/_backends.py @@ -88,12 +88,37 @@ def create_array_collection(expr): # to infer that we want to create an array is the only way that is guaranteed # to be a general solution. # We can get rid of this when we have an Array expression - from dask.dataframe.core import new_dd_object + from dask.highlevelgraph import HighLevelGraph + from dask.layers import Blockwise result = expr.optimize() - return new_dd_object( - result.__dask_graph__(), result._name, result._meta, result.divisions - ) + dsk = result.__dask_graph__() + name = result._name + meta = result._meta + divisions = result.divisions + import dask.array as da + + chunks = ((np.nan,) * (len(divisions) - 1),) + tuple((d,) for d in meta.shape[1:]) + if len(chunks) > 1: + if isinstance(dsk, HighLevelGraph): + layer = dsk.layers[name] + else: + # dask-expr provides a dict only + layer = dsk + if isinstance(layer, Blockwise): + layer.new_axes["j"] = chunks[1][0] + layer.output_indices = layer.output_indices + ("j",) + else: + from dask._task_spec import Alias, Task + + suffix = (0,) * (len(chunks) - 1) + for i in range(len(chunks[0])): + task = layer.get((name, i)) + new_key = (name, i) + suffix + if isinstance(task, Task): + task = Alias(new_key, task.key) + layer[new_key] = task + return da.Array(dsk, name=name, chunks=chunks, dtype=meta.dtype) @get_collection_type.register(np.ndarray) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index db186de42..80f4568f0 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -23,18 +23,20 @@ from dask.dataframe.core import ( _concat, _convert_to_numeric, - _Frame, _repr_data_series, _sqrt_and_convert_to_timedelta, check_divisions, has_parallel_type, - is_arraylike, is_dataframe_like, is_series_like, meta_warning, - new_dd_object, ) -from dask.dataframe.dispatch import is_categorical_dtype, make_meta, meta_nonempty +from dask.dataframe.dispatch import ( + get_parallel_type, + is_categorical_dtype, + make_meta, + meta_nonempty, +) from dask.dataframe.multi import warn_dtype_mismatch from dask.dataframe.utils import ( AttributeNotImplementedError, @@ -52,6 +54,7 @@ derived_from, get_default_shuffle_method, get_meta_library, + is_arraylike, key_split, maybe_pluralize, memory_repr, @@ -1370,25 +1373,6 @@ def repartition( Repartition(self, npartitions, divisions, force, partition_size, freq) ) - def to_legacy_dataframe(self, optimize: bool = True, **optimize_kwargs) -> _Frame: - """Convert to a legacy dask-dataframe collection - - Parameters - ---------- - optimize - Whether to optimize the underlying `Expr` object before conversion. - **optimize_kwargs - Key-word arguments to pass through to `optimize`. - """ - warnings.warn( - "to_legacy_dataframe is deprecated and will be removed in a future release. " - "The legacy implementation as a whole is deprecated and will be removed, making " - "this method unnecessary.", - FutureWarning, - ) - df = self.optimize(**optimize_kwargs) if optimize else self - return new_dd_object(df.dask, df._name, df._meta, df.divisions) - def to_dask_array( self, lengths=None, meta=None, optimize: bool = True, **optimize_kwargs ) -> Array: @@ -5052,28 +5036,6 @@ def from_dict( ) -def from_legacy_dataframe(ddf: _Frame, optimize: bool = True) -> FrameBase: - """Create a dask-expr collection from a legacy dask-dataframe collection - - Parameters - ---------- - optimize - Whether to optimize the graph before conversion. - """ - warnings.warn( - "from_legacy_dataframe is deprecated and will be removed in a future release. " - "The legacy implementation as a whole is deprecated and will be removed, making " - "this method unnecessary.", - FutureWarning, - ) - graph = ddf.dask - if optimize: - graph = ddf.__dask_optimize__(graph, ddf.__dask_keys__()) - return from_graph( - graph, ddf._meta, ddf.divisions, ddf.__dask_keys__(), key_split(ddf._name) - ) - - def from_dask_array(x, columns=None, index=None, meta=None): """Create a Dask DataFrame from a Dask Array. @@ -5793,7 +5755,7 @@ def merge_asof( del kwargs["on"] for o in [left_on, right_on]: - if isinstance(o, _Frame): + if isinstance(o, FrameBase): raise NotImplementedError( "Dask collections not currently allowed in merge columns" ) @@ -6544,3 +6506,8 @@ def _compute_partition_stats( return (mins, maxes, lens) else: return (non_empty_mins, non_empty_maxes, lens) + + +@get_parallel_type.register(FrameBase) +def get_parallel_type_frame(o): + return get_parallel_type(o._meta) diff --git a/dask_expr/_expr.py b/dask_expr/_expr.py index c32cea1d0..e18777961 100644 --- a/dask_expr/_expr.py +++ b/dask_expr/_expr.py @@ -24,12 +24,11 @@ is_dataframe_like, is_index_like, is_series_like, - make_meta, pd_split, safe_head, total_mem_usage, ) -from dask.dataframe.dispatch import meta_nonempty +from dask.dataframe.dispatch import make_meta, meta_nonempty from dask.dataframe.rolling import CombinedOutput, _head_timedelta, overlap_chunk from dask.dataframe.shuffle import drop_overlap, get_overlap from dask.dataframe.utils import ( diff --git a/dask_expr/_groupby.py b/dask_expr/_groupby.py index 8d00e051b..48acd5dbe 100644 --- a/dask_expr/_groupby.py +++ b/dask_expr/_groupby.py @@ -9,7 +9,6 @@ from dask._task_spec import Task from dask.core import flatten from dask.dataframe.core import ( - GROUP_KEYS_DEFAULT, _concat, apply_and_enforce, is_dataframe_like, @@ -17,6 +16,7 @@ ) from dask.dataframe.dispatch import concat, make_meta, meta_nonempty from dask.dataframe.groupby import ( + GROUP_KEYS_DEFAULT, _agg_finalize, _aggregate_docstring, _apply_chunk, diff --git a/dask_expr/_reductions.py b/dask_expr/_reductions.py index ce4a0de7e..6d2bcf224 100644 --- a/dask_expr/_reductions.py +++ b/dask_expr/_reductions.py @@ -19,10 +19,9 @@ is_dataframe_like, is_index_like, is_series_like, - make_meta, - meta_nonempty, total_mem_usage, ) +from dask.dataframe.dispatch import make_meta, meta_nonempty from dask.typing import no_default from dask.utils import M, apply, funcname diff --git a/dask_expr/_shuffle.py b/dask_expr/_shuffle.py index a522ff0cb..0ca19022d 100644 --- a/dask_expr/_shuffle.py +++ b/dask_expr/_shuffle.py @@ -10,8 +10,8 @@ import tlz as toolz from dask import compute from dask._task_spec import Task, TaskRef -from dask.dataframe.core import _concat, make_meta -from dask.dataframe.dispatch import is_categorical_dtype +from dask.dataframe.core import _concat +from dask.dataframe.dispatch import is_categorical_dtype, make_meta from dask.dataframe.shuffle import ( barrier, collect, diff --git a/dask_expr/io/io.py b/dask_expr/io/io.py index 6a2e9d6ce..11f73d0b1 100644 --- a/dask_expr/io/io.py +++ b/dask_expr/io/io.py @@ -9,7 +9,8 @@ from dask._task_spec import List, Task from dask.dataframe import methods from dask.dataframe._pyarrow import to_pyarrow_string -from dask.dataframe.core import apply_and_enforce, is_dataframe_like, make_meta +from dask.dataframe.core import apply_and_enforce, is_dataframe_like +from dask.dataframe.dispatch import make_meta from dask.dataframe.io.io import _meta_from_array, sorted_division_locations from dask.typing import Key from dask.utils import funcname, is_series_like diff --git a/dask_expr/io/tests/test_io.py b/dask_expr/io/tests/test_io.py index 58f48de2b..b70f14b63 100644 --- a/dask_expr/io/tests/test_io.py +++ b/dask_expr/io/tests/test_io.py @@ -15,14 +15,13 @@ from_array, from_dask_array, from_dict, - from_legacy_dataframe, from_map, from_pandas, optimize, read_csv, read_parquet, ) -from dask_expr._expr import Expr, Replace +from dask_expr._expr import Replace from dask_expr.io import FromArray, FromMap, ReadParquet, parquet from dask_expr.tests._util import _backend_library @@ -227,25 +226,6 @@ def test_parquet_complex_filters(tmpdir): assert_eq(got.optimize(), expect) -@pytest.mark.parametrize("optimize", [True, False]) -def test_from_legacy_dataframe(optimize): - ddf = dd.from_dict({"a": range(100)}, npartitions=10) - with pytest.warns(FutureWarning, match="is deprecated"): - df = from_legacy_dataframe(ddf, optimize=optimize) - assert isinstance(df.expr, Expr) - assert_eq(df, ddf) - - -@pytest.mark.parametrize("optimize", [True, False]) -def test_to_legacy_dataframe(optimize): - pdf = pd.DataFrame({"x": [1, 4, 3, 2, 0, 5]}) - df = from_pandas(pdf, npartitions=2) - with pytest.warns(FutureWarning, match="is deprecated"): - ddf = df.to_legacy_dataframe(optimize=optimize) - assert isinstance(ddf, dd.core.DataFrame) - assert_eq(df, ddf) - - @pytest.mark.parametrize("optimize", [True, False]) def test_to_dask_array(optimize): pdf = pd.DataFrame({"x": [1, 4, 3, 2, 0, 5]})