diff --git a/pypaimon/api/read_builder.py b/pypaimon/api/read_builder.py index 68b7d46..cd5715b 100644 --- a/pypaimon/api/read_builder.py +++ b/pypaimon/api/read_builder.py @@ -20,6 +20,8 @@ from pypaimon.api import TableRead, TableScan, Predicate, PredicateBuilder from typing import List +from pypaimon.api.row_type import RowType + class ReadBuilder(ABC): """An interface for building the TableScan and TableRead.""" @@ -50,3 +52,10 @@ def new_read(self) -> TableRead: @abstractmethod def new_predicate_builder(self) -> PredicateBuilder: """Create a builder for Predicate.""" + + @abstractmethod + def read_type(self) -> RowType: + """ + Return the row type of the builder. If there is a projection inside + the builder, the row type will only contain the selected fields. + """ diff --git a/pypaimon/api/row_type.py b/pypaimon/api/row_type.py new file mode 100644 index 0000000..b15c413 --- /dev/null +++ b/pypaimon/api/row_type.py @@ -0,0 +1,29 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# + +import pyarrow as pa + +from abc import ABC, abstractmethod + + +class RowType(ABC): + """Data type of a sequence of fields.""" + + @abstractmethod + def as_arrow(self) -> "pa.Schema": + """Return the row type as an Arrow schema.""" diff --git a/pypaimon/api/split.py b/pypaimon/api/split.py index 386c72b..5b9115c 100644 --- a/pypaimon/api/split.py +++ b/pypaimon/api/split.py @@ -16,8 +16,20 @@ # limitations under the License. ################################################################################# -from abc import ABC +from abc import ABC, abstractmethod + +from typing import Iterator class Split(ABC): """An input split for reading. The most important subclass is DataSplit.""" + + @abstractmethod + def row_count(self) -> int: + """Return the total row count of the split.""" + + def file_size(self) -> int: + """Return the total file size of the split.""" + + def file_paths(self) -> Iterator[str]: + """Return the paths of all raw files in the split.""" diff --git a/pypaimon/py4j/java_implementation.py b/pypaimon/py4j/java_implementation.py index ce90bc5..9f378b7 100644 --- a/pypaimon/py4j/java_implementation.py +++ b/pypaimon/py4j/java_implementation.py @@ -24,7 +24,7 @@ from pypaimon.py4j.java_gateway import get_gateway from pypaimon.py4j.util import java_utils, constants from pypaimon.api import \ - (catalog, table, read_builder, table_scan, split, + (catalog, table, read_builder, table_scan, split, row_type, table_read, write_builder, table_write, commit_message, table_commit, Schema, predicate) from typing import List, Iterator, Optional, Any, TYPE_CHECKING @@ -115,6 +115,18 @@ def new_read(self) -> 'TableRead': def new_predicate_builder(self) -> 'PredicateBuilder': return PredicateBuilder(self._j_row_type) + def read_type(self) -> 'RowType': + return RowType(self._j_read_builder.readType()) + + +class RowType(row_type.RowType): + + def __init__(self, j_row_type): + self._j_row_type = j_row_type + + def as_arrow(self) -> "pa.Schema": + return java_utils.to_arrow_schema(self._j_row_type) + class TableScan(table_scan.TableScan): @@ -144,6 +156,23 @@ def __init__(self, j_split): def to_j_split(self): return self._j_split + def row_count(self) -> int: + return self._j_split.rowCount() + + def file_size(self) -> int: + files_optional = self._j_split.convertToRawFiles() + if not files_optional.isPresent(): + return 0 + files = files_optional.get() + return sum(file.length() for file in files) + + def file_paths(self) -> List[str]: + files_optional = self._j_split.convertToRawFiles() + if not files_optional.isPresent(): + return [] + files = files_optional.get() + return [file.path() for file in files] + class TableRead(table_read.TableRead): diff --git a/pypaimon/py4j/tests/test_object_metadata.py b/pypaimon/py4j/tests/test_object_metadata.py new file mode 100644 index 0000000..e3591c9 --- /dev/null +++ b/pypaimon/py4j/tests/test_object_metadata.py @@ -0,0 +1,73 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os +import pyarrow as pa + +from pypaimon import Schema +from pypaimon.py4j.tests import PypaimonTestBase + + +class ObjectInfoTest(PypaimonTestBase): + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.simple_pa_schema = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()) + ]) + + def test_read_type_metadata(self): + schema = Schema(self.simple_pa_schema) + self.catalog.create_table('default.test_read_type_metadata', schema, False) + table = self.catalog.get_table('default.test_read_type_metadata') + + read_builder = table.new_read_builder() + read_builder.with_projection(['f1']) + pa_schema = read_builder.read_type().as_arrow() + + self.assertEqual(len(pa_schema.names), 1) + self.assertEqual(pa_schema.names[0], 'f1') + + def test_split_metadata(self): + schema = Schema(self.simple_pa_schema) + self.catalog.create_table('default.test_split_metadata', schema, False) + table = self.catalog.get_table('default.test_split_metadata') + + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + data = { + 'f0': [1, 2, 3, 4, 5], + 'f1': ['a', 'b', 'c', 'd', 'e'], + } + pa_table = pa.Table.from_pydict(data, schema=self.simple_pa_schema) + table_write.write_arrow(pa_table) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + splits = table_scan.plan().splits() + + self.assertEqual(len(splits), 1) + self.assertEqual(len(splits[0].file_paths()), 1) + self.assertEqual(splits[0].row_count(), 5) + self.assertTrue(splits[0].file_paths()[0].endswith('.parquet')) + self.assertEqual(splits[0].file_size(), os.path.getsize(splits[0].file_paths()[0]))