Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/paimon-python-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,14 @@ jobs:
with:
java-version: ${{ env.JDK_VERSION }}
distribution: 'adopt'
- name: Set up hadoop dependency
run: |
mkdir -p ${{ github.workspace }}/temp
curl -L -o ${{ github.workspace }}/temp/bundled-hadoop.jar \
https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
- name: Run lint-python.sh
env:
_PYPAIMON_HADOOP_CLASSPATH: ${{ github.workspace }}/temp/bundled-hadoop.jar
run: |
chmod +x dev/lint-python.sh
./dev/lint-python.sh
5 changes: 4 additions & 1 deletion paimon_python_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from .split import Split
from .table_read import TableRead
from .table_scan import TableScan, Plan
from .predicate import Predicate, PredicateBuilder
from .read_builder import ReadBuilder
from .commit_message import CommitMessage
from .table_commit import BatchTableCommit
Expand All @@ -39,5 +40,7 @@
'BatchWriteBuilder',
'Table',
'Schema',
'Catalog'
'Catalog',
'Predicate',
'PredicateBuilder'
]
95 changes: 95 additions & 0 deletions paimon_python_api/predicate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################

from abc import ABC, abstractmethod
from typing import Any, List


class Predicate(ABC):
"""Predicate which evaluates to a boolean. Now it doesn't have
any methods because only paimon_python_java implement it and
the Java implementation convert it to Java object."""


class PredicateBuilder(ABC):
"""A utility class to create Predicate object for common filter conditions."""

@abstractmethod
def equal(self, field: str, literal: Any) -> Predicate:
"""field = literal"""

@abstractmethod
def not_equal(self, field: str, literal: Any) -> Predicate:
"""field <> literal"""

@abstractmethod
def less_than(self, field: str, literal: Any) -> Predicate:
"""field < literal"""

@abstractmethod
def less_or_equal(self, field: str, literal: Any) -> Predicate:
"""field <= literal"""

@abstractmethod
def greater_than(self, field: str, literal: Any) -> Predicate:
"""field > literal"""

@abstractmethod
def greater_or_equal(self, field: str, literal: Any) -> Predicate:
"""field >= literal"""

@abstractmethod
def is_null(self, field: str) -> Predicate:
"""field IS NULL"""

@abstractmethod
def is_not_null(self, field: str) -> Predicate:
"""field IS NOT NULL"""

@abstractmethod
def startswith(self, field: str, pattern_literal: Any) -> Predicate:
"""field.startswith"""

@abstractmethod
def endswith(self, field: str, pattern_literal: Any) -> Predicate:
"""field.endswith()"""

@abstractmethod
def contains(self, field: str, pattern_literal: Any) -> Predicate:
"""literal in field"""

@abstractmethod
def is_in(self, field: str, literals: List[Any]) -> Predicate:
"""field IN literals"""

@abstractmethod
def is_not_in(self, field: str, literals: List[Any]) -> Predicate:
"""field NOT IN literals"""

@abstractmethod
def between(self, field: str, included_lower_bound: Any, included_upper_bound: Any) \
-> Predicate:
"""field BETWEEN included_lower_bound AND included_upper_bound"""

@abstractmethod
def and_predicates(self, predicates: List[Predicate]) -> Predicate:
"""predicate1 AND predicate2 AND ..."""

@abstractmethod
def or_predicates(self, predicates: List[Predicate]) -> Predicate:
"""predicate1 OR predicate2 OR ..."""
13 changes: 12 additions & 1 deletion paimon_python_api/read_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@
#################################################################################

from abc import ABC, abstractmethod
from paimon_python_api import TableRead, TableScan
from paimon_python_api import TableRead, TableScan, Predicate, PredicateBuilder
from typing import List


class ReadBuilder(ABC):
"""An interface for building the TableScan and TableRead."""

@abstractmethod
def with_filter(self, predicate: Predicate):
"""
Push filters, will filter the data as much as possible,
but it is not guaranteed that it is a complete filter.
"""

@abstractmethod
def with_projection(self, projection: List[List[int]]) -> 'ReadBuilder':
"""Push nested projection."""
Expand All @@ -39,3 +46,7 @@ def new_scan(self) -> TableScan:
@abstractmethod
def new_read(self) -> TableRead:
"""Create a TableRead to read splits."""

@abstractmethod
def new_predicate_builder(self) -> PredicateBuilder:
"""Create a builder for Predicate."""
7 changes: 5 additions & 2 deletions paimon_python_java/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

from .util import constants
from .pypaimon import (Catalog, Table, ReadBuilder, TableScan, Plan, Split, TableRead,
BatchWriteBuilder, BatchTableWrite, CommitMessage, BatchTableCommit)
BatchWriteBuilder, BatchTableWrite, CommitMessage, BatchTableCommit,
Predicate, PredicateBuilder)

__all__ = [
'constants',
Expand All @@ -32,5 +33,7 @@
'BatchWriteBuilder',
'BatchTableWrite',
'CommitMessage',
'BatchTableCommit'
'BatchTableCommit',
'Predicate',
'PredicateBuilder'
]
2 changes: 1 addition & 1 deletion paimon_python_java/gateway_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def _get_hadoop_classpath(env):
return env[constants.PYPAIMON_HADOOP_CLASSPATH]

if 'HADOOP_CLASSPATH' in env:
return None
return env['HADOOP_CLASSPATH']
else:
raise EnvironmentError(f"You haven't set '{constants.PYPAIMON_HADOOP_CLASSPATH}', \
and 'HADOOP_CLASSPATH' is also not set. Ensure one of them is set.")
1 change: 1 addition & 0 deletions paimon_python_java/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def import_paimon_view(gateway):
java_import(gateway.jvm, 'org.apache.paimon.types.*')
java_import(gateway.jvm, 'org.apache.paimon.python.*')
java_import(gateway.jvm, "org.apache.paimon.data.*")
java_import(gateway.jvm, "org.apache.paimon.predicate.PredicateBuilder")


class Watchdog(object):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.python;

import org.apache.paimon.data.BinaryString;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;

import java.util.List;
import java.util.stream.Collectors;

/** For building Predicate. */
public class PredicationUtil {

public static Predicate build(
RowType rowType,
PredicateBuilder builder,
String method,
int index,
List<Object> literals) {
literals =
literals.stream()
.map(l -> convertJavaObject(rowType.getTypeAt(index), l))
.collect(Collectors.toList());
switch (method) {
case "equal":
return builder.equal(index, literals.get(0));
case "notEqual":
return builder.notEqual(index, literals.get(0));
case "lessThan":
return builder.lessThan(index, literals.get(0));
case "lessOrEqual":
return builder.lessOrEqual(index, literals.get(0));
case "greaterThan":
return builder.greaterThan(index, literals.get(0));
case "greaterOrEqual":
return builder.greaterOrEqual(index, literals.get(0));
case "isNull":
return builder.isNull(index);
case "isNotNull":
return builder.isNotNull(index);
case "startsWith":
return builder.startsWith(index, literals.get(0));
case "endsWith":
return builder.endsWith(index, literals.get(0));
case "contains":
return builder.contains(index, literals.get(0));
case "in":
return builder.in(index, literals);
case "notIn":
return builder.notIn(index, literals);
case "between":
return builder.between(index, literals.get(0), literals.get(1));
default:
throw new UnsupportedOperationException(
"Unknown PredicateBuilder method " + method);
}
}

/** Some type is not convenient to transfer from Python to Java. */
private static Object convertJavaObject(DataType literalType, Object literal) {
switch (literalType.getTypeRoot()) {
case BOOLEAN:
case DOUBLE:
case INTEGER:
return literal;
case CHAR:
case VARCHAR:
return BinaryString.fromString((String) literal);
case FLOAT:
return ((Number) literal).floatValue();
case TINYINT:
return ((Number) literal).byteValue();
case SMALLINT:
return ((Number) literal).shortValue();
case BIGINT:
return ((Number) literal).longValue();
default:
throw new UnsupportedOperationException(
"Unsupported predicate leaf type " + literalType.getTypeRoot().name());
}
}

public static Predicate buildAnd(List<Predicate> predicates) {
// 'and' is keyword of Python
return PredicateBuilder.and(predicates);
}

public static Predicate buildOr(List<Predicate> predicates) {
// 'or' is keyword of Python
return PredicateBuilder.or(predicates);
}
}
Loading
Loading