From c111cf3e21dcdb8952c43155195ec03f39ced2d3 Mon Sep 17 00:00:00 2001 From: Fangchen Li Date: Mon, 9 Mar 2026 10:32:08 -0700 Subject: [PATCH 1/4] only impl ArrayType --- python/pyspark/sql/conversion.py | 74 +++++++++++++--- python/pyspark/sql/tests/test_conversion.py | 97 +++++++++++++++++++++ 2 files changed, 157 insertions(+), 14 deletions(-) diff --git a/python/pyspark/sql/conversion.py b/python/pyspark/sql/conversion.py index 7e6287fce07ef..1de582dd36186 100644 --- a/python/pyspark/sql/conversion.py +++ b/python/pyspark/sql/conversion.py @@ -65,6 +65,7 @@ ) if TYPE_CHECKING: + import numpy as np import pyarrow as pa import pandas as pd @@ -1470,7 +1471,6 @@ def convert( spark_type, ser_name=ser_name, timezone=timezone, - struct_in_pandas=struct_in_pandas, ndarray_as_list=ndarray_as_list, prefer_int_ext_dtype=prefer_int_ext_dtype, df_for_struct=df_for_struct, @@ -1584,6 +1584,42 @@ def convert_legacy( ) return converter(ser) + @staticmethod + def _ndarray_to_list(v: "np.ndarray") -> list: + """Recursively convert numpy ndarrays to Python lists.""" + import numpy as np + + return [ + ArrowArrayToPandasConversion._ndarray_to_list(x) if isinstance(x, np.ndarray) else x + for x in v + ] + + @staticmethod + def _contains_conversion_type(data_type: DataType) -> bool: + """ + Check if data type tree contains types that require post-processing conversion. + + Returns True if the type contains UserDefinedType, VariantType, GeographyType, + or GeometryType at any nesting level. + """ + if isinstance( + data_type, + (UserDefinedType, VariantType, GeographyType, GeometryType), + ): + return True + elif isinstance(data_type, ArrayType): + return ArrowArrayToPandasConversion._contains_conversion_type(data_type.elementType) + elif isinstance(data_type, MapType): + return ArrowArrayToPandasConversion._contains_conversion_type( + data_type.keyType + ) or ArrowArrayToPandasConversion._contains_conversion_type(data_type.valueType) + elif isinstance(data_type, StructType): + return any( + ArrowArrayToPandasConversion._contains_conversion_type(f.dataType) + for f in data_type.fields + ) + return False + @classmethod def _prefer_convert_numpy( cls, @@ -1611,8 +1647,14 @@ def _prefer_convert_numpy( ) if df_for_struct and isinstance(spark_type, StructType): return all(isinstance(f.dataType, supported_types) for f in spark_type.fields) + elif isinstance(spark_type, supported_types): + return True + elif isinstance(spark_type, ArrayType): + return not cls._contains_conversion_type(spark_type) + # elif isinstance(spark_type, (MapType, StructType)): + # TODO: Support MapType, StructType else: - return isinstance(spark_type, supported_types) + return False @classmethod def convert_numpy( @@ -1622,7 +1664,6 @@ def convert_numpy( *, ser_name: Optional[str] = None, timezone: Optional[str] = None, - struct_in_pandas: Optional[str] = None, ndarray_as_list: bool = False, prefer_int_ext_dtype: bool = False, df_for_struct: bool = False, @@ -1645,7 +1686,6 @@ def convert_numpy( spark_type=field.dataType, ser_name=field.name, timezone=timezone, - struct_in_pandas=struct_in_pandas, ndarray_as_list=ndarray_as_list, prefer_int_ext_dtype=prefer_int_ext_dtype, df_for_struct=False, # always False for child fields @@ -1662,7 +1702,14 @@ def convert_numpy( # This name will be dropped after pa.compute functions. ser_name = arr._name - arr = ArrowArrayConversion.preprocess_time(arr) + if isinstance(spark_type, ArrayType): + # ArrayType only needs tz localization, not ns coercion. + # preprocess_time() coerces to timestamp[ns] which causes PyArrow's + # to_pandas() to return raw integers for nested timestamps + # instead of datetime objects. + arr = ArrowArrayConversion.localize_tz(arr) + else: + arr = ArrowArrayConversion.preprocess_time(arr) series: pd.Series @@ -1731,15 +1778,14 @@ def convert_numpy( series = series.map( lambda v: Geometry.fromWKB(v["wkb"], v["srid"]) if v is not None else None ) - # elif isinstance( - # spark_type, - # ( - # ArrayType, - # MapType, - # StructType, - # ), - # ): - # TODO(SPARK-55324): Support complex types + elif isinstance(spark_type, ArrayType): + if ndarray_as_list: + series = arr.to_pandas(date_as_object=True, integer_object_nulls=True) + series = series.map(lambda x: cls._ndarray_to_list(x) if x is not None else None) + else: + series = arr.to_pandas(date_as_object=True) + # elif isinstance(spark_type, (MapType, StructType)): + # TODO: Support MapType, StructType else: # pragma: no cover assert False, f"Need converter for {spark_type} but failed to find one." diff --git a/python/pyspark/sql/tests/test_conversion.py b/python/pyspark/sql/tests/test_conversion.py index 261b81a407b55..a92db3e2de6ca 100644 --- a/python/pyspark/sql/tests/test_conversion.py +++ b/python/pyspark/sql/tests/test_conversion.py @@ -44,6 +44,7 @@ StringType, StructField, StructType, + TimestampNTZType, TimestampType, UserDefinedType, VariantType, @@ -736,6 +737,102 @@ def test_geometry_convert_numpy(self): ) self.assertEqual(len(result), 0) + def test_array_convert_numpy(self): + import pyarrow as pa + import numpy as np + + arr = pa.array([[1, 2, 3], [4, 5]], type=pa.list_(pa.int64())) + result = ArrowArrayToPandasConversion.convert_numpy(arr, ArrayType(IntegerType())) + self.assertIsInstance(result.iloc[0], np.ndarray) + self.assertEqual(list(result.iloc[0]), [1, 2, 3]) + self.assertEqual(list(result.iloc[1]), [4, 5]) + + # empty inner arrays + arr = pa.array([[], [1, 2], []], type=pa.list_(pa.int64())) + result = ArrowArrayToPandasConversion.convert_numpy(arr, ArrayType(IntegerType())) + self.assertEqual(len(result.iloc[0]), 0) + self.assertEqual(list(result.iloc[1]), [1, 2]) + + # nulls + arr = pa.array([[1, None, 3], None, [4, 5]], type=pa.list_(pa.int64())) + result = ArrowArrayToPandasConversion.convert_numpy(arr, ArrayType(IntegerType())) + self.assertTrue(np.isnan(result.iloc[0][1])) + self.assertIsNone(result.iloc[1]) + + # nested arrays + arr = pa.array([[[1, 2], [3]], [[4, 5]]], type=pa.list_(pa.list_(pa.int64()))) + result = ArrowArrayToPandasConversion.convert_numpy( + arr, ArrayType(ArrayType(IntegerType())) + ) + self.assertIsInstance(result.iloc[0], np.ndarray) + self.assertEqual(list(result.iloc[0][0]), [1, 2]) + self.assertEqual(list(result.iloc[0][1]), [3]) + + def test_array_with_timestamps(self): + import pyarrow as pa + import numpy as np + + # tz-aware timestamps: localize_tz strips tz without ns coercion + ts1 = datetime.datetime(2024, 1, 1, 12, 0, tzinfo=ZoneInfo("UTC")) + ts2 = datetime.datetime(2024, 6, 15, 8, 30, tzinfo=ZoneInfo("UTC")) + arr = pa.array([[ts1, ts2]], type=pa.list_(pa.timestamp("us", tz="UTC"))) + result = ArrowArrayToPandasConversion.convert_numpy(arr, ArrayType(TimestampType())) + self.assertIsInstance(result.iloc[0], np.ndarray) + self.assertEqual(result.iloc[0][0], np.datetime64("2024-01-01T12:00:00", "us")) + self.assertEqual(result.iloc[0][1], np.datetime64("2024-06-15T08:30:00", "us")) + + # tz-naive timestamps + arr = pa.array( + [[datetime.datetime(2024, 1, 1), datetime.datetime(2024, 6, 15)]], + type=pa.list_(pa.timestamp("us")), + ) + result = ArrowArrayToPandasConversion.convert_numpy(arr, ArrayType(TimestampNTZType())) + self.assertEqual(result.iloc[0][0], np.datetime64("2024-01-01T00:00:00", "us")) + + # out-of-ns-range timestamps: preprocess_time would overflow, localize_tz does not + arr = pa.array( + [[datetime.datetime(1000, 1, 1), datetime.datetime(2024, 1, 1)]], + type=pa.list_(pa.timestamp("us")), + ) + # Verify preprocess_time would overflow due to ns coercion + with self.assertRaises(pa.lib.ArrowInvalid): + ArrowArrayConversion.preprocess_time(arr) + # But localize_tz succeeds (no ns coercion) + result_arr = ArrowArrayConversion.localize_tz(arr) + self.assertEqual(result_arr.type, pa.list_(pa.timestamp("us"))) + # And convert_numpy (which uses localize_tz) succeeds + result = ArrowArrayToPandasConversion.convert_numpy(arr, ArrayType(TimestampNTZType())) + self.assertIsInstance(result.iloc[0], np.ndarray) + self.assertEqual(len(result.iloc[0]), 2) + + def test_array_ndarray_as_list(self): + import pyarrow as pa + + arr = pa.array([[1, 2, 3], [4, 5]], type=pa.list_(pa.int64())) + result = ArrowArrayToPandasConversion.convert_numpy( + arr, ArrayType(IntegerType()), ndarray_as_list=True + ) + self.assertIsInstance(result.iloc[0], list) + self.assertEqual(result.iloc[0], [1, 2, 3]) + + # nulls preserved as None (not NaN) + arr = pa.array([[1, None, 3], None], type=pa.list_(pa.int64())) + result = ArrowArrayToPandasConversion.convert_numpy( + arr, ArrayType(IntegerType()), ndarray_as_list=True + ) + self.assertIsInstance(result.iloc[0], list) + self.assertIsNone(result.iloc[0][1]) + self.assertIsNone(result.iloc[1]) + + # nested arrays recursively converted to lists + arr = pa.array([[[1, 2], [3]]], type=pa.list_(pa.list_(pa.int64()))) + result = ArrowArrayToPandasConversion.convert_numpy( + arr, ArrayType(ArrayType(IntegerType())), ndarray_as_list=True + ) + self.assertIsInstance(result.iloc[0], list) + self.assertIsInstance(result.iloc[0][0], list) + self.assertEqual(result.iloc[0][0], [1, 2]) + if __name__ == "__main__": from pyspark.testing import main From db508b2de8faa9e6b6b8a17979bb9aa4f33b4ce3 Mon Sep 17 00:00:00 2001 From: Fangchen Li Date: Mon, 9 Mar 2026 14:56:28 -0700 Subject: [PATCH 2/4] fix array of map and struct --- python/pyspark/sql/conversion.py | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/python/pyspark/sql/conversion.py b/python/pyspark/sql/conversion.py index 1de582dd36186..73fd0029275ae 100644 --- a/python/pyspark/sql/conversion.py +++ b/python/pyspark/sql/conversion.py @@ -1600,24 +1600,17 @@ def _contains_conversion_type(data_type: DataType) -> bool: Check if data type tree contains types that require post-processing conversion. Returns True if the type contains UserDefinedType, VariantType, GeographyType, - or GeometryType at any nesting level. + GeometryType, MapType, or StructType at any nesting level. + MapType and StructType require conversion because PyArrow's to_pandas() produces + maps as lists of tuples (not dicts) and structs as dicts (not Rows). """ if isinstance( data_type, - (UserDefinedType, VariantType, GeographyType, GeometryType), + (UserDefinedType, VariantType, GeographyType, GeometryType, MapType, StructType), ): return True elif isinstance(data_type, ArrayType): return ArrowArrayToPandasConversion._contains_conversion_type(data_type.elementType) - elif isinstance(data_type, MapType): - return ArrowArrayToPandasConversion._contains_conversion_type( - data_type.keyType - ) or ArrowArrayToPandasConversion._contains_conversion_type(data_type.valueType) - elif isinstance(data_type, StructType): - return any( - ArrowArrayToPandasConversion._contains_conversion_type(f.dataType) - for f in data_type.fields - ) return False @classmethod From 3488262d39ab673cc4bb7c5e4f034433c189ad2b Mon Sep 17 00:00:00 2001 From: Fangchen Li Date: Thu, 12 Mar 2026 13:50:31 -0700 Subject: [PATCH 3/4] update --- python/pyspark/sql/conversion.py | 13 +++-------- python/pyspark/sql/tests/test_conversion.py | 26 ++++----------------- 2 files changed, 8 insertions(+), 31 deletions(-) diff --git a/python/pyspark/sql/conversion.py b/python/pyspark/sql/conversion.py index 713315ad72f34..659cf8b4c0bef 100644 --- a/python/pyspark/sql/conversion.py +++ b/python/pyspark/sql/conversion.py @@ -1772,14 +1772,7 @@ def convert_numpy( # This name will be dropped after pa.compute functions. ser_name = arr._name - if isinstance(spark_type, ArrayType): - # ArrayType only needs tz localization, not ns coercion. - # preprocess_time() coerces to timestamp[ns] which causes PyArrow's - # to_pandas() to return raw integers for nested timestamps - # instead of datetime objects. - arr = ArrowArrayConversion.localize_tz(arr) - else: - arr = ArrowArrayConversion.preprocess_time(arr) + arr = ArrowArrayConversion.preprocess_time(arr) series: pd.Series @@ -1850,10 +1843,10 @@ def convert_numpy( ) elif isinstance(spark_type, ArrayType): if ndarray_as_list: - series = arr.to_pandas(date_as_object=True, integer_object_nulls=True) + series = arr.to_pandas(integer_object_nulls=True) series = series.map(lambda x: cls._ndarray_to_list(x) if x is not None else None) else: - series = arr.to_pandas(date_as_object=True) + series = arr.to_pandas() # elif isinstance(spark_type, (MapType, StructType)): # TODO: Support MapType, StructType else: # pragma: no cover diff --git a/python/pyspark/sql/tests/test_conversion.py b/python/pyspark/sql/tests/test_conversion.py index e465c24dab3a4..38145a69d5dd1 100644 --- a/python/pyspark/sql/tests/test_conversion.py +++ b/python/pyspark/sql/tests/test_conversion.py @@ -755,7 +755,7 @@ def test_array_convert_numpy(self): self.assertEqual(len(result.iloc[0]), 0) self.assertEqual(list(result.iloc[1]), [1, 2]) - # nulls + # nulls: inner nulls become NaN (float64) to preserve numeric ndarray dtype arr = pa.array([[1, None, 3], None, [4, 5]], type=pa.list_(pa.int64())) result = ArrowArrayToPandasConversion.convert_numpy(arr, ArrayType(IntegerType())) self.assertTrue(np.isnan(result.iloc[0][1])) @@ -774,14 +774,14 @@ def test_array_with_timestamps(self): import pyarrow as pa import numpy as np - # tz-aware timestamps: localize_tz strips tz without ns coercion + # tz-aware timestamps: preprocess_time strips tz and coerces to ns ts1 = datetime.datetime(2024, 1, 1, 12, 0, tzinfo=ZoneInfo("UTC")) ts2 = datetime.datetime(2024, 6, 15, 8, 30, tzinfo=ZoneInfo("UTC")) arr = pa.array([[ts1, ts2]], type=pa.list_(pa.timestamp("us", tz="UTC"))) result = ArrowArrayToPandasConversion.convert_numpy(arr, ArrayType(TimestampType())) self.assertIsInstance(result.iloc[0], np.ndarray) - self.assertEqual(result.iloc[0][0], np.datetime64("2024-01-01T12:00:00", "us")) - self.assertEqual(result.iloc[0][1], np.datetime64("2024-06-15T08:30:00", "us")) + self.assertEqual(result.iloc[0][0], np.datetime64("2024-01-01T12:00:00", "ns")) + self.assertEqual(result.iloc[0][1], np.datetime64("2024-06-15T08:30:00", "ns")) # tz-naive timestamps arr = pa.array( @@ -789,23 +789,7 @@ def test_array_with_timestamps(self): type=pa.list_(pa.timestamp("us")), ) result = ArrowArrayToPandasConversion.convert_numpy(arr, ArrayType(TimestampNTZType())) - self.assertEqual(result.iloc[0][0], np.datetime64("2024-01-01T00:00:00", "us")) - - # out-of-ns-range timestamps: preprocess_time would overflow, localize_tz does not - arr = pa.array( - [[datetime.datetime(1000, 1, 1), datetime.datetime(2024, 1, 1)]], - type=pa.list_(pa.timestamp("us")), - ) - # Verify preprocess_time would overflow due to ns coercion - with self.assertRaises(pa.lib.ArrowInvalid): - ArrowArrayConversion.preprocess_time(arr) - # But localize_tz succeeds (no ns coercion) - result_arr = ArrowArrayConversion.localize_tz(arr) - self.assertEqual(result_arr.type, pa.list_(pa.timestamp("us"))) - # And convert_numpy (which uses localize_tz) succeeds - result = ArrowArrayToPandasConversion.convert_numpy(arr, ArrayType(TimestampNTZType())) - self.assertIsInstance(result.iloc[0], np.ndarray) - self.assertEqual(len(result.iloc[0]), 2) + self.assertEqual(result.iloc[0][0], np.datetime64("2024-01-01T00:00:00", "ns")) def test_array_ndarray_as_list(self): import pyarrow as pa From 7aac03c483d7f7ab553df9ec265b9829a0375a07 Mon Sep 17 00:00:00 2001 From: Fangchen Li Date: Thu, 12 Mar 2026 14:14:21 -0700 Subject: [PATCH 4/4] revert struct_in_pandas --- python/pyspark/sql/conversion.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/pyspark/sql/conversion.py b/python/pyspark/sql/conversion.py index 659cf8b4c0bef..4c30630090c38 100644 --- a/python/pyspark/sql/conversion.py +++ b/python/pyspark/sql/conversion.py @@ -1548,6 +1548,7 @@ def convert( spark_type, ser_name=ser_name, timezone=timezone, + struct_in_pandas=struct_in_pandas, ndarray_as_list=ndarray_as_list, prefer_int_ext_dtype=prefer_int_ext_dtype, df_for_struct=df_for_struct, @@ -1734,6 +1735,7 @@ def convert_numpy( *, ser_name: Optional[str] = None, timezone: Optional[str] = None, + struct_in_pandas: Optional[str] = None, ndarray_as_list: bool = False, prefer_int_ext_dtype: bool = False, df_for_struct: bool = False, @@ -1756,6 +1758,7 @@ def convert_numpy( spark_type=field.dataType, ser_name=field.name, timezone=timezone, + struct_in_pandas=struct_in_pandas, ndarray_as_list=ndarray_as_list, prefer_int_ext_dtype=prefer_int_ext_dtype, df_for_struct=False, # always False for child fields