From 15a4e1e25dbfd2584896af086238bd9c7d404148 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Mon, 16 Dec 2024 16:23:33 +0100 Subject: [PATCH 1/9] Remove legacy conversion functions --- dask_expr/_backends.py | 8 ++++++- dask_expr/_collection.py | 45 +---------------------------------- dask_expr/io/tests/test_io.py | 22 +---------------- 3 files changed, 9 insertions(+), 66 deletions(-) diff --git a/dask_expr/_backends.py b/dask_expr/_backends.py index 299f3eaff..2e08ad392 100644 --- a/dask_expr/_backends.py +++ b/dask_expr/_backends.py @@ -4,8 +4,9 @@ import pandas as pd from dask.backends import CreationDispatch from dask.dataframe.backends import DataFrameBackendEntrypoint -from dask.dataframe.dispatch import to_pandas_dispatch +from dask.dataframe.dispatch import get_parallel_type, to_pandas_dispatch +from dask_expr import FrameBase from dask_expr._dispatch import get_collection_type from dask_expr._expr import ToBackend @@ -130,3 +131,8 @@ def get_collection_type_object(_): @get_collection_type.register_lazy("cudf") def _register_cudf(): import dask_cudf # noqa: F401 + + +@get_parallel_type.register(FrameBase) +def get_parallel_type_frame(o): + return get_parallel_type(o._meta) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index 9ece42f61..b7d1a1518 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -23,7 +23,6 @@ from dask.dataframe.core import ( _concat, _convert_to_numeric, - _Frame, _repr_data_series, _sqrt_and_convert_to_timedelta, check_divisions, @@ -32,7 +31,6 @@ is_dataframe_like, is_series_like, meta_warning, - new_dd_object, ) from dask.dataframe.dispatch import is_categorical_dtype, make_meta, meta_nonempty from dask.dataframe.multi import warn_dtype_mismatch @@ -1370,25 +1368,6 @@ def repartition( Repartition(self, npartitions, divisions, force, partition_size, freq) ) - def to_legacy_dataframe(self, optimize: bool = True, **optimize_kwargs) -> _Frame: - """Convert to a legacy dask-dataframe collection - - Parameters - ---------- - optimize - Whether to optimize the underlying `Expr` object before conversion. - **optimize_kwargs - Key-word arguments to pass through to `optimize`. - """ - warnings.warn( - "to_legacy_dataframe is deprecated and will be removed in a future release. " - "The legacy implementation as a whole is deprecated and will be removed, making " - "this method unnecessary.", - FutureWarning, - ) - df = self.optimize(**optimize_kwargs) if optimize else self - return new_dd_object(df.dask, df._name, df._meta, df.divisions) - def to_dask_array( self, lengths=None, meta=None, optimize: bool = True, **optimize_kwargs ) -> Array: @@ -5052,28 +5031,6 @@ def from_dict( ) -def from_legacy_dataframe(ddf: _Frame, optimize: bool = True) -> FrameBase: - """Create a dask-expr collection from a legacy dask-dataframe collection - - Parameters - ---------- - optimize - Whether to optimize the graph before conversion. - """ - warnings.warn( - "from_legacy_dataframe is deprecated and will be removed in a future release. " - "The legacy implementation as a whole is deprecated and will be removed, making " - "this method unnecessary.", - FutureWarning, - ) - graph = ddf.dask - if optimize: - graph = ddf.__dask_optimize__(graph, ddf.__dask_keys__()) - return from_graph( - graph, ddf._meta, ddf.divisions, ddf.__dask_keys__(), key_split(ddf._name) - ) - - def from_dask_array(x, columns=None, index=None, meta=None): """Create a Dask DataFrame from a Dask Array. @@ -5809,7 +5766,7 @@ def merge_asof( del kwargs["on"] for o in [left_on, right_on]: - if isinstance(o, _Frame): + if isinstance(o, FrameBase): raise NotImplementedError( "Dask collections not currently allowed in merge columns" ) diff --git a/dask_expr/io/tests/test_io.py b/dask_expr/io/tests/test_io.py index 6ed4f6972..c78f29557 100644 --- a/dask_expr/io/tests/test_io.py +++ b/dask_expr/io/tests/test_io.py @@ -15,14 +15,13 @@ from_array, from_dask_array, from_dict, - from_legacy_dataframe, from_map, from_pandas, optimize, read_csv, read_parquet, ) -from dask_expr._expr import Expr, Replace +from dask_expr._expr import Replace from dask_expr.io import FromArray, FromMap, ReadCSV, ReadParquet, parquet from dask_expr.tests._util import _backend_library @@ -227,25 +226,6 @@ def test_parquet_complex_filters(tmpdir): assert_eq(got.optimize(), expect) -@pytest.mark.parametrize("optimize", [True, False]) -def test_from_legacy_dataframe(optimize): - ddf = dd.from_dict({"a": range(100)}, npartitions=10) - with pytest.warns(FutureWarning, match="is deprecated"): - df = from_legacy_dataframe(ddf, optimize=optimize) - assert isinstance(df.expr, Expr) - assert_eq(df, ddf) - - -@pytest.mark.parametrize("optimize", [True, False]) -def test_to_legacy_dataframe(optimize): - pdf = pd.DataFrame({"x": [1, 4, 3, 2, 0, 5]}) - df = from_pandas(pdf, npartitions=2) - with pytest.warns(FutureWarning, match="is deprecated"): - ddf = df.to_legacy_dataframe(optimize=optimize) - assert isinstance(ddf, dd.core.DataFrame) - assert_eq(df, ddf) - - @pytest.mark.parametrize("optimize", [True, False]) def test_to_dask_array(optimize): pdf = pd.DataFrame({"x": [1, 4, 3, 2, 0, 5]}) From a6939b1ba30d882d17d3a0a71bf3852ddfa8d097 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Mon, 16 Dec 2024 16:36:09 +0100 Subject: [PATCH 2/9] Update --- dask_expr/_backends.py | 8 +------- dask_expr/_collection.py | 12 +++++++++++- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/dask_expr/_backends.py b/dask_expr/_backends.py index 2e08ad392..299f3eaff 100644 --- a/dask_expr/_backends.py +++ b/dask_expr/_backends.py @@ -4,9 +4,8 @@ import pandas as pd from dask.backends import CreationDispatch from dask.dataframe.backends import DataFrameBackendEntrypoint -from dask.dataframe.dispatch import get_parallel_type, to_pandas_dispatch +from dask.dataframe.dispatch import to_pandas_dispatch -from dask_expr import FrameBase from dask_expr._dispatch import get_collection_type from dask_expr._expr import ToBackend @@ -131,8 +130,3 @@ def get_collection_type_object(_): @get_collection_type.register_lazy("cudf") def _register_cudf(): import dask_cudf # noqa: F401 - - -@get_parallel_type.register(FrameBase) -def get_parallel_type_frame(o): - return get_parallel_type(o._meta) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index b7d1a1518..453653da4 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -32,7 +32,12 @@ is_series_like, meta_warning, ) -from dask.dataframe.dispatch import is_categorical_dtype, make_meta, meta_nonempty +from dask.dataframe.dispatch import ( + get_parallel_type, + is_categorical_dtype, + make_meta, + meta_nonempty, +) from dask.dataframe.multi import warn_dtype_mismatch from dask.dataframe.utils import ( AttributeNotImplementedError, @@ -6517,3 +6522,8 @@ def _compute_partition_stats( return (mins, maxes, lens) else: return (non_empty_mins, non_empty_maxes, lens) + + +@get_parallel_type.register(FrameBase) +def get_parallel_type_frame(o): + return get_parallel_type(o._meta) From 8a3219386f8683c09e0745ecbf3adde03fbd7104 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Mon, 16 Dec 2024 19:36:15 +0100 Subject: [PATCH 3/9] Update _collection.py --- dask_expr/_collection.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index 453653da4..7e7268b15 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -6522,8 +6522,3 @@ def _compute_partition_stats( return (mins, maxes, lens) else: return (non_empty_mins, non_empty_maxes, lens) - - -@get_parallel_type.register(FrameBase) -def get_parallel_type_frame(o): - return get_parallel_type(o._meta) From 3597bd3b099e3f44fb4199bc5bbb346d4aef61dd Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Mon, 16 Dec 2024 19:43:55 +0100 Subject: [PATCH 4/9] Update _collection.py --- dask_expr/_collection.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index 7e7268b15..ff7016058 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -5771,7 +5771,7 @@ def merge_asof( del kwargs["on"] for o in [left_on, right_on]: - if isinstance(o, FrameBase): + if isinstance(o, _Frame): raise NotImplementedError( "Dask collections not currently allowed in merge columns" ) @@ -6522,3 +6522,8 @@ def _compute_partition_stats( return (mins, maxes, lens) else: return (non_empty_mins, non_empty_maxes, lens) + + +@get_parallel_type.register(FrameBase) +def get_parallel_type_frame(o): + return get_parallel_type(o._meta) From e5651c20eef8e200501c8068f7c71674b911d550 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Mon, 16 Dec 2024 19:45:34 +0100 Subject: [PATCH 5/9] Fixup --- dask_expr/_collection.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index ff7016058..59027db54 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -23,6 +23,7 @@ from dask.dataframe.core import ( _concat, _convert_to_numeric, + _Frame, _repr_data_series, _sqrt_and_convert_to_timedelta, check_divisions, From ed5917f81e26c75f6e07e87760aac7b40fa1acba Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Tue, 17 Dec 2024 11:49:21 +0100 Subject: [PATCH 6/9] Remove _Frame references --- dask_expr/_collection.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index 59027db54..453653da4 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -23,7 +23,6 @@ from dask.dataframe.core import ( _concat, _convert_to_numeric, - _Frame, _repr_data_series, _sqrt_and_convert_to_timedelta, check_divisions, @@ -5772,7 +5771,7 @@ def merge_asof( del kwargs["on"] for o in [left_on, right_on]: - if isinstance(o, _Frame): + if isinstance(o, FrameBase): raise NotImplementedError( "Dask collections not currently allowed in merge columns" ) From b8c69bb012a638c1c29d54661a04fcfd58574a09 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Tue, 17 Dec 2024 11:51:54 +0100 Subject: [PATCH 7/9] Fixup --- dask_expr/_collection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index 453653da4..426f108f8 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -27,7 +27,6 @@ _sqrt_and_convert_to_timedelta, check_divisions, has_parallel_type, - is_arraylike, is_dataframe_like, is_series_like, meta_warning, @@ -55,6 +54,7 @@ derived_from, get_default_shuffle_method, get_meta_library, + is_arraylike, key_split, maybe_pluralize, memory_repr, From db458d5625c380d54873fa60ecfef3392e066d55 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Tue, 17 Dec 2024 11:56:12 +0100 Subject: [PATCH 8/9] Fixup imports --- dask_expr/_expr.py | 3 +-- dask_expr/_groupby.py | 2 +- dask_expr/_reductions.py | 3 +-- dask_expr/_shuffle.py | 4 ++-- dask_expr/io/io.py | 3 ++- 5 files changed, 7 insertions(+), 8 deletions(-) diff --git a/dask_expr/_expr.py b/dask_expr/_expr.py index 3a42d3d96..7b96bf84a 100644 --- a/dask_expr/_expr.py +++ b/dask_expr/_expr.py @@ -24,12 +24,11 @@ is_dataframe_like, is_index_like, is_series_like, - make_meta, pd_split, safe_head, total_mem_usage, ) -from dask.dataframe.dispatch import meta_nonempty +from dask.dataframe.dispatch import make_meta, meta_nonempty from dask.dataframe.rolling import CombinedOutput, _head_timedelta, overlap_chunk from dask.dataframe.shuffle import drop_overlap, get_overlap from dask.dataframe.utils import ( diff --git a/dask_expr/_groupby.py b/dask_expr/_groupby.py index 49f342f35..4fe1ab998 100644 --- a/dask_expr/_groupby.py +++ b/dask_expr/_groupby.py @@ -9,7 +9,6 @@ from dask._task_spec import Task from dask.core import flatten from dask.dataframe.core import ( - GROUP_KEYS_DEFAULT, _concat, apply_and_enforce, is_dataframe_like, @@ -17,6 +16,7 @@ ) from dask.dataframe.dispatch import concat, make_meta, meta_nonempty from dask.dataframe.groupby import ( + GROUP_KEYS_DEFAULT, _agg_finalize, _aggregate_docstring, _apply_chunk, diff --git a/dask_expr/_reductions.py b/dask_expr/_reductions.py index ce4a0de7e..6d2bcf224 100644 --- a/dask_expr/_reductions.py +++ b/dask_expr/_reductions.py @@ -19,10 +19,9 @@ is_dataframe_like, is_index_like, is_series_like, - make_meta, - meta_nonempty, total_mem_usage, ) +from dask.dataframe.dispatch import make_meta, meta_nonempty from dask.typing import no_default from dask.utils import M, apply, funcname diff --git a/dask_expr/_shuffle.py b/dask_expr/_shuffle.py index a522ff0cb..0ca19022d 100644 --- a/dask_expr/_shuffle.py +++ b/dask_expr/_shuffle.py @@ -10,8 +10,8 @@ import tlz as toolz from dask import compute from dask._task_spec import Task, TaskRef -from dask.dataframe.core import _concat, make_meta -from dask.dataframe.dispatch import is_categorical_dtype +from dask.dataframe.core import _concat +from dask.dataframe.dispatch import is_categorical_dtype, make_meta from dask.dataframe.shuffle import ( barrier, collect, diff --git a/dask_expr/io/io.py b/dask_expr/io/io.py index 6a2e9d6ce..11f73d0b1 100644 --- a/dask_expr/io/io.py +++ b/dask_expr/io/io.py @@ -9,7 +9,8 @@ from dask._task_spec import List, Task from dask.dataframe import methods from dask.dataframe._pyarrow import to_pyarrow_string -from dask.dataframe.core import apply_and_enforce, is_dataframe_like, make_meta +from dask.dataframe.core import apply_and_enforce, is_dataframe_like +from dask.dataframe.dispatch import make_meta from dask.dataframe.io.io import _meta_from_array, sorted_division_locations from dask.typing import Key from dask.utils import funcname, is_series_like From 727cc3fb901d61fbbe48b6cc8e8fde2a6da6039b Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Tue, 17 Dec 2024 12:50:53 +0100 Subject: [PATCH 9/9] Fixup imports --- dask_expr/_backends.py | 33 +++++++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/dask_expr/_backends.py b/dask_expr/_backends.py index 299f3eaff..120757377 100644 --- a/dask_expr/_backends.py +++ b/dask_expr/_backends.py @@ -88,12 +88,37 @@ def create_array_collection(expr): # to infer that we want to create an array is the only way that is guaranteed # to be a general solution. # We can get rid of this when we have an Array expression - from dask.dataframe.core import new_dd_object + from dask.highlevelgraph import HighLevelGraph + from dask.layers import Blockwise result = expr.optimize() - return new_dd_object( - result.__dask_graph__(), result._name, result._meta, result.divisions - ) + dsk = result.__dask_graph__() + name = result._name + meta = result._meta + divisions = result.divisions + import dask.array as da + + chunks = ((np.nan,) * (len(divisions) - 1),) + tuple((d,) for d in meta.shape[1:]) + if len(chunks) > 1: + if isinstance(dsk, HighLevelGraph): + layer = dsk.layers[name] + else: + # dask-expr provides a dict only + layer = dsk + if isinstance(layer, Blockwise): + layer.new_axes["j"] = chunks[1][0] + layer.output_indices = layer.output_indices + ("j",) + else: + from dask._task_spec import Alias, Task + + suffix = (0,) * (len(chunks) - 1) + for i in range(len(chunks[0])): + task = layer.get((name, i)) + new_key = (name, i) + suffix + if isinstance(task, Task): + task = Alias(new_key, task.key) + layer[new_key] = task + return da.Array(dsk, name=name, chunks=chunks, dtype=meta.dtype) @get_collection_type.register(np.ndarray)