diff --git a/.github/workflows/paimon-python-checks.yml b/.github/workflows/paimon-python-checks.yml index e94820b..195783f 100644 --- a/.github/workflows/paimon-python-checks.yml +++ b/.github/workflows/paimon-python-checks.yml @@ -43,7 +43,14 @@ jobs: with: java-version: ${{ env.JDK_VERSION }} distribution: 'adopt' + - name: Set up hadoop dependency + run: | + mkdir -p ${{ github.workspace }}/temp + curl -L -o ${{ github.workspace }}/temp/bundled-hadoop.jar \ + https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar - name: Run lint-python.sh + env: + _PYPAIMON_HADOOP_CLASSPATH: ${{ github.workspace }}/temp/bundled-hadoop.jar run: | chmod +x dev/lint-python.sh ./dev/lint-python.sh diff --git a/paimon_python_api/__init__.py b/paimon_python_api/__init__.py index 86090c9..44717bf 100644 --- a/paimon_python_api/__init__.py +++ b/paimon_python_api/__init__.py @@ -19,6 +19,7 @@ from .split import Split from .table_read import TableRead from .table_scan import TableScan, Plan +from .predicate import Predicate, PredicateBuilder from .read_builder import ReadBuilder from .commit_message import CommitMessage from .table_commit import BatchTableCommit @@ -39,5 +40,7 @@ 'BatchWriteBuilder', 'Table', 'Schema', - 'Catalog' + 'Catalog', + 'Predicate', + 'PredicateBuilder' ] diff --git a/paimon_python_api/predicate.py b/paimon_python_api/predicate.py new file mode 100644 index 0000000..46280d1 --- /dev/null +++ b/paimon_python_api/predicate.py @@ -0,0 +1,95 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# + +from abc import ABC, abstractmethod +from typing import Any, List + + +class Predicate(ABC): + """Predicate which evaluates to a boolean. Now it doesn't have + any methods because only paimon_python_java implement it and + the Java implementation convert it to Java object.""" + + +class PredicateBuilder(ABC): + """A utility class to create Predicate object for common filter conditions.""" + + @abstractmethod + def equal(self, field: str, literal: Any) -> Predicate: + """field = literal""" + + @abstractmethod + def not_equal(self, field: str, literal: Any) -> Predicate: + """field <> literal""" + + @abstractmethod + def less_than(self, field: str, literal: Any) -> Predicate: + """field < literal""" + + @abstractmethod + def less_or_equal(self, field: str, literal: Any) -> Predicate: + """field <= literal""" + + @abstractmethod + def greater_than(self, field: str, literal: Any) -> Predicate: + """field > literal""" + + @abstractmethod + def greater_or_equal(self, field: str, literal: Any) -> Predicate: + """field >= literal""" + + @abstractmethod + def is_null(self, field: str) -> Predicate: + """field IS NULL""" + + @abstractmethod + def is_not_null(self, field: str) -> Predicate: + """field IS NOT NULL""" + + @abstractmethod + def startswith(self, field: str, pattern_literal: Any) -> Predicate: + """field.startswith""" + + @abstractmethod + def endswith(self, field: str, pattern_literal: Any) -> Predicate: + """field.endswith()""" + + @abstractmethod + def contains(self, field: str, pattern_literal: Any) -> Predicate: + """literal in field""" + + @abstractmethod + def is_in(self, field: str, literals: List[Any]) -> Predicate: + """field IN literals""" + + @abstractmethod + def is_not_in(self, field: str, literals: List[Any]) -> Predicate: + """field NOT IN literals""" + + @abstractmethod + def between(self, field: str, included_lower_bound: Any, included_upper_bound: Any) \ + -> Predicate: + """field BETWEEN included_lower_bound AND included_upper_bound""" + + @abstractmethod + def and_predicates(self, predicates: List[Predicate]) -> Predicate: + """predicate1 AND predicate2 AND ...""" + + @abstractmethod + def or_predicates(self, predicates: List[Predicate]) -> Predicate: + """predicate1 OR predicate2 OR ...""" diff --git a/paimon_python_api/read_builder.py b/paimon_python_api/read_builder.py index 94ec073..ad5e6d6 100644 --- a/paimon_python_api/read_builder.py +++ b/paimon_python_api/read_builder.py @@ -17,13 +17,20 @@ ################################################################################# from abc import ABC, abstractmethod -from paimon_python_api import TableRead, TableScan +from paimon_python_api import TableRead, TableScan, Predicate, PredicateBuilder from typing import List class ReadBuilder(ABC): """An interface for building the TableScan and TableRead.""" + @abstractmethod + def with_filter(self, predicate: Predicate): + """ + Push filters, will filter the data as much as possible, + but it is not guaranteed that it is a complete filter. + """ + @abstractmethod def with_projection(self, projection: List[List[int]]) -> 'ReadBuilder': """Push nested projection.""" @@ -39,3 +46,7 @@ def new_scan(self) -> TableScan: @abstractmethod def new_read(self) -> TableRead: """Create a TableRead to read splits.""" + + @abstractmethod + def new_predicate_builder(self) -> PredicateBuilder: + """Create a builder for Predicate.""" diff --git a/paimon_python_java/__init__.py b/paimon_python_java/__init__.py index 6e97d9e..9b0d002 100644 --- a/paimon_python_java/__init__.py +++ b/paimon_python_java/__init__.py @@ -18,7 +18,8 @@ from .util import constants from .pypaimon import (Catalog, Table, ReadBuilder, TableScan, Plan, Split, TableRead, - BatchWriteBuilder, BatchTableWrite, CommitMessage, BatchTableCommit) + BatchWriteBuilder, BatchTableWrite, CommitMessage, BatchTableCommit, + Predicate, PredicateBuilder) __all__ = [ 'constants', @@ -32,5 +33,7 @@ 'BatchWriteBuilder', 'BatchTableWrite', 'CommitMessage', - 'BatchTableCommit' + 'BatchTableCommit', + 'Predicate', + 'PredicateBuilder' ] diff --git a/paimon_python_java/gateway_server.py b/paimon_python_java/gateway_server.py index 7285e98..2061d59 100644 --- a/paimon_python_java/gateway_server.py +++ b/paimon_python_java/gateway_server.py @@ -103,7 +103,7 @@ def _get_hadoop_classpath(env): return env[constants.PYPAIMON_HADOOP_CLASSPATH] if 'HADOOP_CLASSPATH' in env: - return None + return env['HADOOP_CLASSPATH'] else: raise EnvironmentError(f"You haven't set '{constants.PYPAIMON_HADOOP_CLASSPATH}', \ and 'HADOOP_CLASSPATH' is also not set. Ensure one of them is set.") diff --git a/paimon_python_java/java_gateway.py b/paimon_python_java/java_gateway.py index f2b1621..3dabcfd 100644 --- a/paimon_python_java/java_gateway.py +++ b/paimon_python_java/java_gateway.py @@ -109,6 +109,7 @@ def import_paimon_view(gateway): java_import(gateway.jvm, 'org.apache.paimon.types.*') java_import(gateway.jvm, 'org.apache.paimon.python.*') java_import(gateway.jvm, "org.apache.paimon.data.*") + java_import(gateway.jvm, "org.apache.paimon.predicate.PredicateBuilder") class Watchdog(object): diff --git a/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PredicationUtil.java b/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PredicationUtil.java new file mode 100644 index 0000000..a863dfd --- /dev/null +++ b/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PredicationUtil.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.python; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.RowType; + +import java.util.List; +import java.util.stream.Collectors; + +/** For building Predicate. */ +public class PredicationUtil { + + public static Predicate build( + RowType rowType, + PredicateBuilder builder, + String method, + int index, + List literals) { + literals = + literals.stream() + .map(l -> convertJavaObject(rowType.getTypeAt(index), l)) + .collect(Collectors.toList()); + switch (method) { + case "equal": + return builder.equal(index, literals.get(0)); + case "notEqual": + return builder.notEqual(index, literals.get(0)); + case "lessThan": + return builder.lessThan(index, literals.get(0)); + case "lessOrEqual": + return builder.lessOrEqual(index, literals.get(0)); + case "greaterThan": + return builder.greaterThan(index, literals.get(0)); + case "greaterOrEqual": + return builder.greaterOrEqual(index, literals.get(0)); + case "isNull": + return builder.isNull(index); + case "isNotNull": + return builder.isNotNull(index); + case "startsWith": + return builder.startsWith(index, literals.get(0)); + case "endsWith": + return builder.endsWith(index, literals.get(0)); + case "contains": + return builder.contains(index, literals.get(0)); + case "in": + return builder.in(index, literals); + case "notIn": + return builder.notIn(index, literals); + case "between": + return builder.between(index, literals.get(0), literals.get(1)); + default: + throw new UnsupportedOperationException( + "Unknown PredicateBuilder method " + method); + } + } + + /** Some type is not convenient to transfer from Python to Java. */ + private static Object convertJavaObject(DataType literalType, Object literal) { + switch (literalType.getTypeRoot()) { + case BOOLEAN: + case DOUBLE: + case INTEGER: + return literal; + case CHAR: + case VARCHAR: + return BinaryString.fromString((String) literal); + case FLOAT: + return ((Number) literal).floatValue(); + case TINYINT: + return ((Number) literal).byteValue(); + case SMALLINT: + return ((Number) literal).shortValue(); + case BIGINT: + return ((Number) literal).longValue(); + default: + throw new UnsupportedOperationException( + "Unsupported predicate leaf type " + literalType.getTypeRoot().name()); + } + } + + public static Predicate buildAnd(List predicates) { + // 'and' is keyword of Python + return PredicateBuilder.and(predicates); + } + + public static Predicate buildOr(List predicates) { + // 'or' is keyword of Python + return PredicateBuilder.or(predicates); + } +} diff --git a/paimon_python_java/pypaimon.py b/paimon_python_java/pypaimon.py index fcf0695..0d3101b 100644 --- a/paimon_python_java/pypaimon.py +++ b/paimon_python_java/pypaimon.py @@ -22,8 +22,9 @@ from paimon_python_java.java_gateway import get_gateway from paimon_python_java.util import java_utils, constants from paimon_python_api import (catalog, table, read_builder, table_scan, split, table_read, - write_builder, table_write, commit_message, table_commit, Schema) -from typing import List, Iterator, Optional + write_builder, table_write, commit_message, table_commit, Schema, + predicate) +from typing import List, Iterator, Optional, Any class Catalog(catalog.Catalog): @@ -85,6 +86,10 @@ def __init__(self, j_read_builder, j_row_type, catalog_options: dict, arrow_sche self._catalog_options = catalog_options self._arrow_schema = arrow_schema + def with_filter(self, predicate: 'Predicate'): + self._j_read_builder.withFilter(predicate.to_j_predicate()) + return self + def with_projection(self, projection: List[List[int]]) -> 'ReadBuilder': self._j_read_builder.withProjection(projection) return self @@ -98,9 +103,12 @@ def new_scan(self) -> 'TableScan': return TableScan(j_table_scan) def new_read(self) -> 'TableRead': - j_table_read = self._j_read_builder.newRead() + j_table_read = self._j_read_builder.newRead().executeFilter() return TableRead(j_table_read, self._j_row_type, self._catalog_options, self._arrow_schema) + def new_predicate_builder(self) -> 'PredicateBuilder': + return PredicateBuilder(self._j_row_type) + class TableScan(table_scan.TableScan): @@ -257,3 +265,92 @@ def commit(self, commit_messages: List[CommitMessage]): def close(self): self._j_batch_table_commit.close() + + +class Predicate(predicate.Predicate): + + def __init__(self, j_predicate): + self._j_predicate = j_predicate + + def to_j_predicate(self): + return self._j_predicate + + +class PredicateBuilder(predicate.PredicateBuilder): + + def __init__(self, j_row_type): + self._field_names = j_row_type.getFieldNames() + self._j_row_type = j_row_type + self._j_predicate_builder = get_gateway().jvm.PredicateBuilder(j_row_type) + + def _build(self, method: str, field: str, literals: Optional[List[Any]] = None): + error = ValueError(f'The field {field} is not in field list {self._field_names}.') + try: + index = self._field_names.index(field) + if index == -1: + raise error + except ValueError: + raise error + + if literals is None: + literals = [] + + j_predicate = get_gateway().jvm.PredicationUtil.build( + self._j_row_type, + self._j_predicate_builder, + method, + index, + literals + ) + return Predicate(j_predicate) + + def equal(self, field: str, literal: Any) -> Predicate: + return self._build('equal', field, [literal]) + + def not_equal(self, field: str, literal: Any) -> Predicate: + return self._build('notEqual', field, [literal]) + + def less_than(self, field: str, literal: Any) -> Predicate: + return self._build('lessThan', field, [literal]) + + def less_or_equal(self, field: str, literal: Any) -> Predicate: + return self._build('lessOrEqual', field, [literal]) + + def greater_than(self, field: str, literal: Any) -> Predicate: + return self._build('greaterThan', field, [literal]) + + def greater_or_equal(self, field: str, literal: Any) -> Predicate: + return self._build('greaterOrEqual', field, [literal]) + + def is_null(self, field: str) -> Predicate: + return self._build('isNull', field) + + def is_not_null(self, field: str) -> Predicate: + return self._build('isNotNull', field) + + def startswith(self, field: str, pattern_literal: Any) -> Predicate: + return self._build('startsWith', field, [pattern_literal]) + + def endswith(self, field: str, pattern_literal: Any) -> Predicate: + return self._build('endsWith', field, [pattern_literal]) + + def contains(self, field: str, pattern_literal: Any) -> Predicate: + return self._build('contains', field, [pattern_literal]) + + def is_in(self, field: str, literals: List[Any]) -> Predicate: + return self._build('in', field, literals) + + def is_not_in(self, field: str, literals: List[Any]) -> Predicate: + return self._build('notIn', field, literals) + + def between(self, field: str, included_lower_bound: Any, included_upper_bound: Any) \ + -> Predicate: + return self._build('between', field, [included_lower_bound, included_upper_bound]) + + def and_predicates(self, predicates: List[Predicate]) -> Predicate: + predicates = list(map(lambda p: p.to_j_predicate(), predicates)) + return Predicate(get_gateway().jvm.PredicationUtil.buildAnd(predicates)) + + def or_predicates(self, predicates: List[Predicate]) -> Predicate: + predicates = list(map(lambda p: p.to_j_predicate(), predicates)) + return Predicate(get_gateway().jvm.PredicationUtil.buildOr(predicates)) diff --git a/paimon_python_java/tests/test_preicates.py b/paimon_python_java/tests/test_preicates.py new file mode 100644 index 0000000..7ee1a91 --- /dev/null +++ b/paimon_python_java/tests/test_preicates.py @@ -0,0 +1,394 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os +import shutil +import tempfile +import unittest +import random +import pandas as pd +import pyarrow as pa + +from paimon_python_api import Schema +from paimon_python_java import Catalog +from paimon_python_java.tests import utils +from setup_utils import java_setuputils + + +def _check_filtered_result(read_builder, expected_df): + scan = read_builder.new_scan() + read = read_builder.new_read() + actual_df = read.to_pandas(scan.plan().splits()) + pd.testing.assert_frame_equal( + actual_df.reset_index(drop=True), expected_df.reset_index(drop=True)) + + +# TODO: parquet has bug now +def _random_format(): + return random.choice(['avro', 'orc']) + + +class PredicateTest(unittest.TestCase): + + @classmethod + def setUpClass(cls): + java_setuputils.setup_java_bridge() + cls.hadoop_path = tempfile.mkdtemp() + utils.setup_hadoop_bundle_jar(cls.hadoop_path) + cls.warehouse = tempfile.mkdtemp() + + catalog = Catalog.create({'warehouse': cls.warehouse}) + catalog.create_database('default', False) + + pa_schema = pa.schema([ + ('f0', pa.int64()), + ('f1', pa.string()), + ]) + catalog.create_table('default.test_append', + Schema(pa_schema, options={'file.format': _random_format()}), + False) + catalog.create_table('default.test_pk', + Schema(pa_schema, primary_keys=['f0'], + options={'bucket': '1', 'file.format': _random_format()}), + False) + + df = pd.DataFrame({ + 'f0': [1, 2, 3, 4, 5], + 'f1': ['abc', 'abbc', 'bc', 'd', None], + }) + + append_table = catalog.get_table('default.test_append') + write_builder = append_table.new_batch_write_builder() + write = write_builder.new_write() + commit = write_builder.new_commit() + write.write_pandas(df) + commit.commit(write.prepare_commit()) + write.close() + commit.close() + + pk_table = catalog.get_table('default.test_pk') + write_builder = pk_table.new_batch_write_builder() + write = write_builder.new_write() + commit = write_builder.new_commit() + write.write_pandas(df) + commit.commit(write.prepare_commit()) + write.close() + commit.close() + + cls.catalog = catalog + cls.df = df + + @classmethod + def tearDownClass(cls): + java_setuputils.clean() + if os.path.exists(cls.hadoop_path): + shutil.rmtree(cls.hadoop_path) + if os.path.exists(cls.warehouse): + shutil.rmtree(cls.warehouse) + + def testWrongFieldName(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + with self.assertRaises(ValueError) as e: + predicate_builder.equal('f2', 'a') + self.assertEqual(str(e.exception), "The field f2 is not in field list ['f0', 'f1'].") + + def testAppendWithDuplicate(self): + pa_schema = pa.schema([ + ('f0', pa.int64()), + ('f1', pa.string()), + ]) + self.catalog.create_table('default.test_append_with_duplicate', Schema(pa_schema), False) + + df = pd.DataFrame({ + 'f0': [1, 1, 2, 2], + 'f1': ['a', 'b', 'c', 'd'], + }) + + table = self.catalog.get_table('default.test_append_with_duplicate') + write_builder = table.new_batch_write_builder() + write = write_builder.new_write() + commit = write_builder.new_commit() + write.write_pandas(df) + commit.commit(write.prepare_commit()) + write.close() + commit.close() + + predicate_builder = table.new_read_builder().new_predicate_builder() + + predicate = predicate_builder.equal('f0', 1) + _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[0:1]) + + predicate = predicate_builder.equal('f0', 0) + read_builder = table.new_read_builder().with_filter(predicate) + scan = read_builder.new_scan() + read = read_builder.new_read() + actual_df = read.to_pandas(scan.plan().splits()) + self.assertEqual(len(actual_df), 0) + + def testAllFieldTypesWithEqual(self): + pa_schema = pa.schema([ + # int + ('_tinyint', pa.int8()), + ('_smallint', pa.int16()), + ('_int', pa.int32()), + ('_bigint', pa.int64()), + # float + ('_float16', pa.float32()), # NOTE: cannot write pa.float16() data into Paimon + ('_float32', pa.float32()), + ('_double', pa.float64()), + # string + ('_string', pa.string()), + # bool + ('_boolean', pa.bool_()) + ]) + self.catalog.create_table('default.test_all_field_types', + Schema(pa_schema, options={'file.format': _random_format()}), + False) + table = self.catalog.get_table('default.test_all_field_types') + write_builder = table.new_batch_write_builder() + write = write_builder.new_write() + commit = write_builder.new_commit() + + df = pd.DataFrame({ + '_tinyint': pd.Series([1, 2], dtype='int8'), + '_smallint': pd.Series([10, 20], dtype='int16'), + '_int': pd.Series([100, 200], dtype='int32'), + '_bigint': pd.Series([1000, 2000], dtype='int64'), + '_float16': pd.Series([1.0, 2.0], dtype='float16'), + '_float32': pd.Series([1.00, 2.00], dtype='float32'), + '_double': pd.Series([1.000, 2.000], dtype='double'), + '_string': pd.Series(['A', 'B'], dtype='object'), + '_boolean': [True, False] + }) + record_batch = pa.RecordBatch.from_pandas(df, schema=pa_schema) + # prepare for assertion + df['_float16'] = df['_float16'].astype('float32') + + write.write_arrow_batch(record_batch) + commit.commit(write.prepare_commit()) + write.close() + commit.close() + + predicate_builder = table.new_read_builder().new_predicate_builder() + + predicate = predicate_builder.equal('_tinyint', 1) + _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[0]]) + + predicate = predicate_builder.equal('_smallint', 20) + _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[1]]) + + predicate = predicate_builder.equal('_int', 100) + _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[0]]) + + predicate = predicate_builder.equal('_bigint', 2000) + _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[1]]) + + predicate = predicate_builder.equal('_float16', 1.0) + _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[0]]) + + predicate = predicate_builder.equal('_float32', 2.00) + _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[1]]) + + predicate = predicate_builder.equal('_double', 1.000) + _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[0]]) + + predicate = predicate_builder.equal('_string', 'B') + _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[1]]) + + predicate = predicate_builder.equal('_boolean', True) + _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[0]]) + + def testEqualPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.equal('f0', 1) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[0]]) + + def testNotEqualAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.not_equal('f0', 1) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[1:4]) + + def testNotEqualPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.not_equal('f0', 1) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[1:4]) + + def testLessThanAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.less_than('f0', 3) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:1]) + + def testLessThanPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.less_than('f0', 3) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:1]) + + def testLessOrEqualAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.less_or_equal('f0', 3) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2]) + + def testLessOrEqualPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.less_or_equal('f0', 3) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2]) + + def testGreaterThanAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.greater_than('f0', 3) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[3:4]) + + def testGreaterThanPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.greater_than('f0', 3) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[3:4]) + + def testGreaterOrEqualAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.greater_or_equal('f0', 3) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[2:4]) + + def testGreaterOrEqualPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.greater_or_equal('f0', 3) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[2:4]) + + def testIsNullAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.is_null('f1') + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[4]]) + + def testIsNullPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.is_null('f1') + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[4]]) + + def testIsNotNullAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.is_not_null('f1') + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:3]) + + def testIsNotNullPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.is_not_null('f1') + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:3]) + + def testStartswithAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.startswith('f1', 'ab') + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:1]) + + def testStartswithPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.startswith('f1', 'ab') + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:1]) + + def testEndswithAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.endswith('f1', 'bc') + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2]) + + def testEndswithPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.endswith('f1', 'bc') + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2]) + + def testContainsAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.contains('f1', 'bb') + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[1]]) + + def testContainsPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.contains('f1', 'bb') + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[1]]) + + def testIsInAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.is_in('f0', [1, 2]) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:1]) + + def testIsInPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.is_in('f1', ['abc', 'd']) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[0, 3]]) + + def testIsNotInAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.is_not_in('f0', [1, 2]) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[2:4]) + + def testIsNotInPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.is_not_in('f1', ['abc', 'd']) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[1:2]) + + def testBetweenAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.between('f0', 1, 3) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2]) + + def testBetweenPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.between('f0', 1, 3) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2]) + + def testAndPredicates(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate1 = predicate_builder.greater_than('f0', 1) + predicate2 = predicate_builder.startswith('f1', 'ab') + predicate = predicate_builder.and_predicates([predicate1, predicate2]) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[1]]) + + def testOrPredicates(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate1 = predicate_builder.greater_than('f0', 3) + predicate2 = predicate_builder.less_than('f0', 2) + predicate = predicate_builder.or_predicates([predicate1, predicate2]) + _check_filtered_result(table.new_read_builder().with_filter(predicate), + self.df.loc[[0, 3, 4]]) diff --git a/paimon_python_java/tests/utils.py b/paimon_python_java/tests/utils.py index 2af2fe0..350f80e 100644 --- a/paimon_python_java/tests/utils.py +++ b/paimon_python_java/tests/utils.py @@ -23,6 +23,11 @@ def setup_hadoop_bundle_jar(hadoop_dir): + if constants.PYPAIMON_HADOOP_CLASSPATH in os.environ: + file = os.environ[constants.PYPAIMON_HADOOP_CLASSPATH] + if os.path.isfile(file): + return + url = 'https://repo.maven.apache.org/maven2/org/apache/flink/' \ 'flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar'