From 4ad7a7f069c6cb7e32259dca5035b91318121b36 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 1 Aug 2024 17:30:01 -0700 Subject: [PATCH 1/5] add hacky support for resource_barrier API --- dask_expr/_collection.py | 9 ++++++++- dask_expr/_core.py | 36 ++++++++++++++++++++++++++++++++++++ dask_expr/_expr.py | 19 ++++++++++++++++++- 3 files changed, 62 insertions(+), 2 deletions(-) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index 002aff4da..025b98dea 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -474,7 +474,11 @@ def compute(self, fuse=True, **kwargs): if not isinstance(out, Scalar): out = out.repartition(npartitions=1) out = out.optimize(fuse=fuse) - return DaskMethodsMixin.compute(out, **kwargs) + return DaskMethodsMixin.compute( + out, + task_resources=out.expr.collect_task_resources(), + **kwargs, + ) def analyze(self, filename: str | None = None, format: str | None = None) -> None: """Outputs statistics about every node in the expression. @@ -2494,6 +2498,9 @@ def to_delayed(self, optimize_graph=True): """ return self.to_legacy_dataframe().to_delayed(optimize_graph=optimize_graph) + def resource_barrier(self, resources): + return new_collection(expr.ResourceBarrier(self.expr, resources)) + def to_backend(self, backend: str | None = None, **kwargs): """Move to a new DataFrame backend diff --git a/dask_expr/_core.py b/dask_expr/_core.py index cc4fdadac..17e3241b5 100644 --- a/dask_expr/_core.py +++ b/dask_expr/_core.py @@ -738,6 +738,42 @@ def walk(self) -> Generator[Expr]: yield node + def collect_task_resources(self) -> dict: + from dask_expr._expr import ResourceBarrier + + if not self.find_operations(ResourceBarrier): + return {} + + resources_annotation = {} + known_resources = {self._name: None} + stack = [self] + seen = set() + while stack: + node = stack.pop() + if node._name in seen: + continue + seen.add(node._name) + + if isinstance(node, ResourceBarrier): + known_resources[node._name] = node.resources + resources = known_resources[node._name] + + if resources: + resources_annotation.update( + { + k: (resources(k) if callable(resources) else resources) + for k in node._layer().keys() + } + ) + + for dep in node.dependencies(): + if not isinstance(dep, ResourceBarrier): + # TODO: Protect against conflicting resources + known_resources[dep._name] = resources + stack.append(dep) + + return resources_annotation + def find_operations(self, operation: type | tuple[type]) -> Generator[Expr]: """Search the expression graph for a specific operation type diff --git a/dask_expr/_expr.py b/dask_expr/_expr.py index c0366432a..ce633871d 100644 --- a/dask_expr/_expr.py +++ b/dask_expr/_expr.py @@ -1303,6 +1303,23 @@ def operation(df): return df.copy(deep=True) +class ResourceBarrier(Elemwise): + _parameters = ["frame", "resources"] + _projection_passthrough = True + _filter_passthrough = True + _preserves_partitioning_information = True + + def _task(self, index: int): + return (self.frame._name, index) + + @property + def _meta(self): + return self.frame._meta + + def _divisions(self): + return self.frame.divisions + + class RenameSeries(Elemwise): _parameters = ["frame", "index", "sorted_index"] _defaults = {"sorted_index": False} @@ -3128,7 +3145,7 @@ def are_co_aligned(*exprs): def is_valid_blockwise_op(expr): return isinstance(expr, Blockwise) and not isinstance( - expr, (FromPandas, FromArray, FromDelayed) + expr, (FromPandas, FromArray, FromDelayed, ResourceBarrier) ) From d66675af4964f290da3664485787e1cfe2cc35cb Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 1 Aug 2024 17:52:27 -0700 Subject: [PATCH 2/5] support persist --- dask_expr/_collection.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index 025b98dea..80539600f 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -445,7 +445,11 @@ def __array__(self, dtype=None, **kwargs): def persist(self, fuse=True, **kwargs): out = self.optimize(fuse=fuse) - return DaskMethodsMixin.persist(out, **kwargs) + return DaskMethodsMixin.persist( + out, + task_resources=out.expr.collect_task_resources(), + **kwargs, + ) def compute(self, fuse=True, **kwargs): """Compute this DataFrame. From d1980f081011410d5dcc9030dcd86ca88eb53f88 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 2 Aug 2024 10:24:24 -0700 Subject: [PATCH 3/5] revise direction of resource inheritance --- dask_expr/_collection.py | 9 ++++++++- dask_expr/_core.py | 16 ++-------------- dask_expr/_expr.py | 24 +++++++++++++++++++++--- dask_expr/io/io.py | 9 +++++++++ dask_expr/io/parquet.py | 4 ++++ 5 files changed, 44 insertions(+), 18 deletions(-) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index 80539600f..06299bcb9 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -2503,7 +2503,7 @@ def to_delayed(self, optimize_graph=True): return self.to_legacy_dataframe().to_delayed(optimize_graph=optimize_graph) def resource_barrier(self, resources): - return new_collection(expr.ResourceBarrier(self.expr, resources)) + return new_collection(expr.ElemwiseResourceBarrier(self.expr, resources)) def to_backend(self, backend: str | None = None, **kwargs): """Move to a new DataFrame backend @@ -5203,6 +5203,7 @@ def read_parquet( filesystem="fsspec", engine=None, arrow_to_pandas=None, + resources=None, **kwargs, ): """ @@ -5397,6 +5398,7 @@ def read_parquet( to_parquet pyarrow.parquet.ParquetDataset """ + from dask_expr.io.io import IOResourceBarrier from dask_expr.io.parquet import ( ReadParquetFSSpec, ReadParquetPyarrowFS, @@ -5416,6 +5418,9 @@ def read_parquet( if op == "in" and not isinstance(val, (set, list, tuple)): raise TypeError("Value of 'in' filter must be a list, set or tuple.") + if resources is not None: + resources = IOResourceBarrier(resources) + if ( isinstance(filesystem, pa_fs.FileSystem) or isinstance(filesystem, str) @@ -5465,6 +5470,7 @@ def read_parquet( pyarrow_strings_enabled=pyarrow_strings_enabled(), kwargs=kwargs, _series=isinstance(columns, str), + resource_requirement=resources, ) ) @@ -5487,6 +5493,7 @@ def read_parquet( engine=_set_parquet_engine(engine), kwargs=kwargs, _series=isinstance(columns, str), + resource_requirement=resources, ) ) diff --git a/dask_expr/_core.py b/dask_expr/_core.py index 17e3241b5..446d81d3e 100644 --- a/dask_expr/_core.py +++ b/dask_expr/_core.py @@ -739,13 +739,7 @@ def walk(self) -> Generator[Expr]: yield node def collect_task_resources(self) -> dict: - from dask_expr._expr import ResourceBarrier - - if not self.find_operations(ResourceBarrier): - return {} - resources_annotation = {} - known_resources = {self._name: None} stack = [self] seen = set() while stack: @@ -754,11 +748,8 @@ def collect_task_resources(self) -> dict: continue seen.add(node._name) - if isinstance(node, ResourceBarrier): - known_resources[node._name] = node.resources - resources = known_resources[node._name] - - if resources: + resources = node._resources + if resources is not None: resources_annotation.update( { k: (resources(k) if callable(resources) else resources) @@ -767,9 +758,6 @@ def collect_task_resources(self) -> dict: ) for dep in node.dependencies(): - if not isinstance(dep, ResourceBarrier): - # TODO: Protect against conflicting resources - known_resources[dep._name] = resources stack.append(dep) return resources_annotation diff --git a/dask_expr/_expr.py b/dask_expr/_expr.py index ce633871d..c832fe3a6 100644 --- a/dask_expr/_expr.py +++ b/dask_expr/_expr.py @@ -52,7 +52,7 @@ random_state_data, ) from pandas.errors import PerformanceWarning -from tlz import merge_sorted, partition, unique +from tlz import merge, merge_sorted, partition, unique from dask_expr import _core as core from dask_expr._util import ( @@ -87,6 +87,11 @@ def ndim(self): except AttributeError: return 0 + @functools.cached_property + def _resources(self): + dep_resources = merge(dep._resources or {} for dep in self.dependencies()) + return dep_resources or None + def __dask_keys__(self): return [(self._name, i) for i in range(self.npartitions)] @@ -1303,12 +1308,25 @@ def operation(df): return df.copy(deep=True) -class ResourceBarrier(Elemwise): - _parameters = ["frame", "resources"] +class ResourceBarrier(Expr): + @property + def _resources(self): + raise NotImplementedError() + + def __str__(self): + return f"{type(self).__name__}({self._resources})" + + +class ElemwiseResourceBarrier(Elemwise, ResourceBarrier): + _parameters = ["frame", "resource_spec"] _projection_passthrough = True _filter_passthrough = True _preserves_partitioning_information = True + @property + def _resources(self): + return self.resource_spec + def _task(self, index: int): return (self.frame._name, index) diff --git a/dask_expr/io/io.py b/dask_expr/io/io.py index a28b8762e..ff0c365d5 100644 --- a/dask_expr/io/io.py +++ b/dask_expr/io/io.py @@ -19,6 +19,7 @@ Literal, PartitionsFiltered, Projection, + ResourceBarrier, determine_column_projection, no_default, ) @@ -31,6 +32,14 @@ def __str__(self): return f"{type(self).__name__}({self._name[-7:]})" +class IOResourceBarrier(ResourceBarrier): + _parameters = ["resource_spec"] + + @property + def _resources(self): + return self.resource_spec + + class FromGraph(IO): """A DataFrame created from an opaque Dask task graph diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index dc75a47a0..caa859c41 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -828,6 +828,7 @@ class ReadParquetPyarrowFS(ReadParquet): "arrow_to_pandas", "pyarrow_strings_enabled", "kwargs", + "resource_requirement", "_partitions", "_series", "_dataset_info_cache", @@ -844,6 +845,7 @@ class ReadParquetPyarrowFS(ReadParquet): "arrow_to_pandas": None, "pyarrow_strings_enabled": True, "kwargs": None, + "resource_requirement": None, "_partitions": None, "_series": False, "_dataset_info_cache": None, @@ -1253,6 +1255,7 @@ class ReadParquetFSSpec(ReadParquet): "filesystem", "engine", "kwargs", + "resource_requirement", "_partitions", "_series", "_dataset_info_cache", @@ -1273,6 +1276,7 @@ class ReadParquetFSSpec(ReadParquet): "filesystem": "fsspec", "engine": "pyarrow", "kwargs": None, + "resource_requirement": None, "_partitions": None, "_series": False, "_dataset_info_cache": None, From 8bded48923a91c518702ec45d7ef3935226f0335 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 2 Aug 2024 10:54:06 -0700 Subject: [PATCH 4/5] update public docstrings --- dask_expr/_collection.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index 06299bcb9..31b64ced7 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -2503,6 +2503,27 @@ def to_delayed(self, optimize_graph=True): return self.to_legacy_dataframe().to_delayed(optimize_graph=optimize_graph) def resource_barrier(self, resources): + """Define a resource-constraint barrier + + Parameters + ---------- + resources : dict + Resource constraint (e.g. ``{GPU: 1}``). + + Notes + ----- + 1. This resources constraint will be applied to all tasks + generated by operations after this point (or until the + `resource_barrier` API is used again). + 2. This resource constraint will superceed any other + resource constraints defined with global annotations. + 3. Creating a resource barrier will not block optimizations + like column projection or predicate pushdown. We assume + both projection and filtering are resource agnostic. + 4. Resource constraints only apply to distributed execution. + 5. The scheduler will only try to satisfy resource constraints + when relevant worker resources exist. + """ return new_collection(expr.ElemwiseResourceBarrier(self.expr, resources)) def to_backend(self, backend: str | None = None, **kwargs): @@ -5383,6 +5404,10 @@ def read_parquet( arrow_to_pandas: dict, default None Dictionary of options to use when converting from ``pyarrow.Table`` to a pandas ``DataFrame`` object. Only used by the "arrow" engine. + resources: dict, default None + Resource constraint to apply to the generated IO tasks and all + future operations. The `resource_barrier` API can be used to modify + future resource constraints after the collection is created. **kwargs: dict (of dicts) Options to pass through to ``engine.read_partitions`` as stand-alone key-word arguments. Note that these options will be ignored by the From d3d5f864b55efd1c72ad170d612dbb548f2a9ea8 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 2 Aug 2024 12:13:06 -0700 Subject: [PATCH 5/5] fix fusion --- dask_expr/io/io.py | 10 +++++++++- dask_expr/io/parquet.py | 2 +- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/dask_expr/io/io.py b/dask_expr/io/io.py index ff0c365d5..87261bbbf 100644 --- a/dask_expr/io/io.py +++ b/dask_expr/io/io.py @@ -39,6 +39,9 @@ class IOResourceBarrier(ResourceBarrier): def _resources(self): return self.resource_spec + def _layer(self): + return {} + class FromGraph(IO): """A DataFrame created from an opaque Dask task graph @@ -158,7 +161,8 @@ def _tune_up(self, parent): class FusedParquetIO(FusedIO): - _parameters = ["_expr"] + _parameters = ["_expr", "resource_requirement"] + _defaults = {"resource_requirement": None} @functools.cached_property def _name(self): @@ -168,6 +172,10 @@ def _name(self): + _tokenize_deterministic(*self.operands) ) + def dependencies(self): + dep = self.resource_requirement + return [] if dep is None else [dep] + @staticmethod def _load_multiple_files( frag_filters, diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index caa859c41..9fd5290e1 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -1100,7 +1100,7 @@ def _tune_up(self, parent): return if isinstance(parent, FusedParquetIO): return - return parent.substitute(self, FusedParquetIO(self)) + return parent.substitute(self, FusedParquetIO(self, self.resource_requirement)) @cached_property def fragments(self):