From 6b9abe4a1acf55e5617f9548a0def03288c0af7a Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Wed, 24 Dec 2025 17:10:32 +0800 Subject: [PATCH 1/2] row filter poc --- docs/static/rest-catalog-open-api.yaml | 79 ++++++- .../java/org/apache/paimon/rest/RESTApi.java | 3 +- .../paimon/rest/filter/CompoundFilter.java | 57 +++++ .../rest/filter/CompoundFilterFunction.java | 25 +++ .../rest/filter/FieldFilterTransform.java | 67 ++++++ .../org/apache/paimon/rest/filter/Filter.java | 30 +++ .../paimon/rest/filter/FilterTransform.java | 27 +++ .../rest/filter/LeafFilterFunction.java | 37 +++ .../paimon/rest/filter/TransformFilter.java | 69 ++++++ .../responses/AuthTableQueryResponse.java | 9 +- .../paimon/catalog/AbstractCatalog.java | 3 +- .../org/apache/paimon/catalog/Catalog.java | 6 +- .../paimon/catalog/DelegateCatalog.java | 3 +- .../org/apache/paimon/rest/RESTCatalog.java | 8 +- .../rest/filter/FilterPredicateConverter.java | 139 ++++++++++++ .../paimon/table/CatalogEnvironment.java | 8 +- .../table/source/AbstractDataTableScan.java | 6 +- .../paimon/table/source/ReadBuilderImpl.java | 21 +- .../paimon/table/source/TableQueryAuth.java | 11 +- .../paimon/rest/MockRESTCatalogTest.java | 36 +++ .../apache/paimon/rest/RESTApiJsonTest.java | 50 +++++ .../apache/paimon/rest/RESTCatalogServer.java | 9 +- .../rest/filter/FilterConverterTest.java | 210 ++++++++++++++++++ .../paimon/flink/RESTCatalogITCase.java | 29 +++ .../spark/SparkCatalogWithRestTest.java | 23 ++ 25 files changed, 944 insertions(+), 21 deletions(-) create mode 100644 paimon-api/src/main/java/org/apache/paimon/rest/filter/CompoundFilter.java create mode 100644 paimon-api/src/main/java/org/apache/paimon/rest/filter/CompoundFilterFunction.java create mode 100644 paimon-api/src/main/java/org/apache/paimon/rest/filter/FieldFilterTransform.java create mode 100644 paimon-api/src/main/java/org/apache/paimon/rest/filter/Filter.java create mode 100644 paimon-api/src/main/java/org/apache/paimon/rest/filter/FilterTransform.java create mode 100644 paimon-api/src/main/java/org/apache/paimon/rest/filter/LeafFilterFunction.java create mode 100644 paimon-api/src/main/java/org/apache/paimon/rest/filter/TransformFilter.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/rest/filter/FilterPredicateConverter.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/rest/filter/FilterConverterTest.java diff --git a/docs/static/rest-catalog-open-api.yaml b/docs/static/rest-catalog-open-api.yaml index 1cbb02040fd6..7d98d7c398ca 100644 --- a/docs/static/rest-catalog-open-api.yaml +++ b/docs/static/rest-catalog-open-api.yaml @@ -2911,9 +2911,86 @@ components: type: object properties: filter: + $ref: '#/components/schemas/Filter' + Filter: + oneOf: + - $ref: '#/components/schemas/CompoundFilter' + - $ref: '#/components/schemas/TransformFilter' + discriminator: + propertyName: type + CompoundFilter: + type: object + required: + - type + - function + - children + properties: + type: + type: string + enum: [compound] + function: + type: string + enum: [AND, OR] + children: type: array items: - type: string + $ref: '#/components/schemas/Filter' + TransformFilter: + type: object + required: + - type + - transform + - function + - literals + properties: + type: + type: string + enum: [transform] + transform: + $ref: '#/components/schemas/FilterTransform' + function: + type: string + enum: + - EQUAL + - NOT_EQUAL + - GREATER_THAN + - GREATER_OR_EQUAL + - LESS_THAN + - LESS_OR_EQUAL + - IN + - NOT_IN + - IS_NULL + - IS_NOT_NULL + - STARTS_WITH + - ENDS_WITH + - CONTAINS + - LIKE + literals: + type: array + items: + nullable: true + FilterTransform: + oneOf: + - $ref: '#/components/schemas/FieldFilterTransform' + discriminator: + propertyName: type + FieldFilterTransform: + type: object + required: + - type + - index + - name + - dataType + properties: + type: + type: string + enum: [field] + index: + type: integer + name: + type: string + dataType: + type: string AlterDatabaseRequest: type: object properties: diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java index 648c39551391..763d8e06fc0f 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java @@ -32,6 +32,7 @@ import org.apache.paimon.rest.exceptions.AlreadyExistsException; import org.apache.paimon.rest.exceptions.ForbiddenException; import org.apache.paimon.rest.exceptions.NoSuchResourceException; +import org.apache.paimon.rest.filter.Filter; import org.apache.paimon.rest.requests.AlterDatabaseRequest; import org.apache.paimon.rest.requests.AlterFunctionRequest; import org.apache.paimon.rest.requests.AlterTableRequest; @@ -660,7 +661,7 @@ public void alterTable(Identifier identifier, List changes) { * @throws ForbiddenException Exception thrown on HTTP 403 means don't have the permission for * this table */ - public List authTableQuery(Identifier identifier, @Nullable List select) { + public @Nullable Filter authTableQuery(Identifier identifier, @Nullable List select) { AuthTableQueryRequest request = new AuthTableQueryRequest(select); AuthTableQueryResponse response = client.post( diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/filter/CompoundFilter.java b/paimon-api/src/main/java/org/apache/paimon/rest/filter/CompoundFilter.java new file mode 100644 index 000000000000..f59708a7da62 --- /dev/null +++ b/paimon-api/src/main/java/org/apache/paimon/rest/filter/CompoundFilter.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.filter; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +/** + * A compound filter which combines child {@link Filter}s using a {@link CompoundFilterFunction}. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class CompoundFilter implements Filter { + + private static final String FIELD_FUNCTION = "function"; + private static final String FIELD_CHILDREN = "children"; + + private final CompoundFilterFunction function; + private final List children; + + @JsonCreator + public CompoundFilter( + @JsonProperty(FIELD_FUNCTION) CompoundFilterFunction function, + @JsonProperty(FIELD_CHILDREN) List children) { + this.function = function; + this.children = children; + } + + @JsonGetter(FIELD_FUNCTION) + public CompoundFilterFunction function() { + return function; + } + + @JsonGetter(FIELD_CHILDREN) + public List children() { + return children; + } +} diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/filter/CompoundFilterFunction.java b/paimon-api/src/main/java/org/apache/paimon/rest/filter/CompoundFilterFunction.java new file mode 100644 index 000000000000..0417c8caadb6 --- /dev/null +++ b/paimon-api/src/main/java/org/apache/paimon/rest/filter/CompoundFilterFunction.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.filter; + +/** Boolean operator used to combine multiple {@link Filter} expressions. */ +public enum CompoundFilterFunction { + AND, + OR +} diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/filter/FieldFilterTransform.java b/paimon-api/src/main/java/org/apache/paimon/rest/filter/FieldFilterTransform.java new file mode 100644 index 000000000000..a45314146586 --- /dev/null +++ b/paimon-api/src/main/java/org/apache/paimon/rest/filter/FieldFilterTransform.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.filter; + +import org.apache.paimon.types.DataType; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +/** + * A {@link FilterTransform} which references a table field (index/name/type) as the transform + * input. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class FieldFilterTransform implements FilterTransform { + + private static final String FIELD_INDEX = "index"; + private static final String FIELD_NAME = "name"; + private static final String FIELD_DATA_TYPE = "dataType"; + + private final int index; + private final String name; + private final DataType dataType; + + @JsonCreator + public FieldFilterTransform( + @JsonProperty(FIELD_INDEX) int index, + @JsonProperty(FIELD_NAME) String name, + @JsonProperty(FIELD_DATA_TYPE) DataType dataType) { + this.index = index; + this.name = name; + this.dataType = dataType; + } + + @JsonGetter(FIELD_INDEX) + public int index() { + return index; + } + + @JsonGetter(FIELD_NAME) + public String name() { + return name; + } + + @JsonGetter(FIELD_DATA_TYPE) + public DataType dataType() { + return dataType; + } +} diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/filter/Filter.java b/paimon-api/src/main/java/org/apache/paimon/rest/filter/Filter.java new file mode 100644 index 000000000000..ca315ee9286a --- /dev/null +++ b/paimon-api/src/main/java/org/apache/paimon/rest/filter/Filter.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.filter; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo; + +/** Base interface for REST filter expressions. */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") +@JsonSubTypes({ + @JsonSubTypes.Type(value = CompoundFilter.class, name = "compound"), + @JsonSubTypes.Type(value = TransformFilter.class, name = "transform") +}) +public interface Filter {} diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/filter/FilterTransform.java b/paimon-api/src/main/java/org/apache/paimon/rest/filter/FilterTransform.java new file mode 100644 index 000000000000..8d0b5023830a --- /dev/null +++ b/paimon-api/src/main/java/org/apache/paimon/rest/filter/FilterTransform.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.filter; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo; + +/** Transform applied to a field before evaluating a {@link Filter} in REST requests. */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") +@JsonSubTypes({@JsonSubTypes.Type(value = FieldFilterTransform.class, name = "field")}) +public interface FilterTransform {} diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/filter/LeafFilterFunction.java b/paimon-api/src/main/java/org/apache/paimon/rest/filter/LeafFilterFunction.java new file mode 100644 index 000000000000..abb39b570c65 --- /dev/null +++ b/paimon-api/src/main/java/org/apache/paimon/rest/filter/LeafFilterFunction.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.filter; + +/** Supported leaf comparison functions for REST {@link Filter} expressions. */ +public enum LeafFilterFunction { + EQUAL, + NOT_EQUAL, + GREATER_THAN, + GREATER_OR_EQUAL, + LESS_THAN, + LESS_OR_EQUAL, + IN, + NOT_IN, + IS_NULL, + IS_NOT_NULL, + STARTS_WITH, + ENDS_WITH, + CONTAINS, + LIKE +} diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/filter/TransformFilter.java b/paimon-api/src/main/java/org/apache/paimon/rest/filter/TransformFilter.java new file mode 100644 index 000000000000..c366eb491352 --- /dev/null +++ b/paimon-api/src/main/java/org/apache/paimon/rest/filter/TransformFilter.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.filter; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +/** + * A REST {@link Filter} that applies a {@link FilterTransform} to a field, then evaluates the + * transformed result with a {@link LeafFilterFunction} and optional literals. + * + *

This type is designed for JSON serialization/deserialization in REST requests. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class TransformFilter implements Filter { + + private static final String FIELD_TRANSFORM = "transform"; + private static final String FIELD_FUNCTION = "function"; + private static final String FIELD_LITERALS = "literals"; + + private final FilterTransform transform; + private final LeafFilterFunction function; + private final List literals; + + @JsonCreator + public TransformFilter( + @JsonProperty(FIELD_TRANSFORM) FilterTransform transform, + @JsonProperty(FIELD_FUNCTION) LeafFilterFunction function, + @JsonProperty(FIELD_LITERALS) List literals) { + this.transform = transform; + this.function = function; + this.literals = literals; + } + + @JsonGetter(FIELD_TRANSFORM) + public FilterTransform transform() { + return transform; + } + + @JsonGetter(FIELD_FUNCTION) + public LeafFilterFunction function() { + return function; + } + + @JsonGetter(FIELD_LITERALS) + public List literals() { + return literals; + } +} diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/responses/AuthTableQueryResponse.java b/paimon-api/src/main/java/org/apache/paimon/rest/responses/AuthTableQueryResponse.java index 0f833b03302a..70760b1fd378 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/responses/AuthTableQueryResponse.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/responses/AuthTableQueryResponse.java @@ -19,6 +19,7 @@ package org.apache.paimon.rest.responses; import org.apache.paimon.rest.RESTResponse; +import org.apache.paimon.rest.filter.Filter; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; @@ -26,8 +27,6 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; -import java.util.List; - /** Response for auth table query. */ @JsonIgnoreProperties(ignoreUnknown = true) public class AuthTableQueryResponse implements RESTResponse { @@ -36,15 +35,15 @@ public class AuthTableQueryResponse implements RESTResponse { @JsonInclude(JsonInclude.Include.NON_NULL) @JsonProperty(FIELD_FILTER) - private final List filter; + private final Filter filter; @JsonCreator - public AuthTableQueryResponse(@JsonProperty(FIELD_FILTER) List filter) { + public AuthTableQueryResponse(@JsonProperty(FIELD_FILTER) Filter filter) { this.filter = filter; } @JsonGetter(FIELD_FILTER) - public List filter() { + public Filter filter() { return filter; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 97e8436a7582..2652c4260998 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -574,7 +574,8 @@ public boolean supportsVersionManagement() { } @Override - public List authTableQuery(Identifier identifier, List select) { + public org.apache.paimon.predicate.Predicate authTableQuery( + Identifier identifier, @Nullable List select) { throw new UnsupportedOperationException(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 7d98180c3c2e..ea09cf722b6b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -26,6 +26,7 @@ import org.apache.paimon.function.FunctionChange; import org.apache.paimon.partition.Partition; import org.apache.paimon.partition.PartitionStatistics; +import org.apache.paimon.predicate.Predicate; import org.apache.paimon.rest.responses.GetTagResponse; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -1015,10 +1016,11 @@ void alterFunction( * * @param identifier path of the table to alter partitions * @param select selected fields, null if select all - * @return additional filter for row level access control + * @return row-level access control predicate, null if no additional filter * @throws TableNotExistException if the table does not exist */ - List authTableQuery(Identifier identifier, @Nullable List select) + @Nullable + Predicate authTableQuery(Identifier identifier, @Nullable List select) throws TableNotExistException; // ==================== Catalog Information ========================== diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index 9fc057aa90fd..c40ff3fe86bc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -24,6 +24,7 @@ import org.apache.paimon.function.FunctionChange; import org.apache.paimon.partition.Partition; import org.apache.paimon.partition.PartitionStatistics; +import org.apache.paimon.predicate.Predicate; import org.apache.paimon.rest.responses.GetTagResponse; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -406,7 +407,7 @@ public PagedList listPartitionsPaged( } @Override - public List authTableQuery(Identifier identifier, @Nullable List select) + public Predicate authTableQuery(Identifier identifier, @Nullable List select) throws TableNotExistException { return wrapped.authTableQuery(identifier, select); } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index cff58bf71389..51042c3729fe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -37,12 +37,15 @@ import org.apache.paimon.options.Options; import org.apache.paimon.partition.Partition; import org.apache.paimon.partition.PartitionStatistics; +import org.apache.paimon.predicate.Predicate; import org.apache.paimon.rest.exceptions.AlreadyExistsException; import org.apache.paimon.rest.exceptions.BadRequestException; import org.apache.paimon.rest.exceptions.ForbiddenException; import org.apache.paimon.rest.exceptions.NoSuchResourceException; import org.apache.paimon.rest.exceptions.NotImplementedException; import org.apache.paimon.rest.exceptions.ServiceFailureException; +import org.apache.paimon.rest.filter.Filter; +import org.apache.paimon.rest.filter.FilterPredicateConverter; import org.apache.paimon.rest.responses.ErrorResponse; import org.apache.paimon.rest.responses.GetDatabaseResponse; import org.apache.paimon.rest.responses.GetFunctionResponse; @@ -524,11 +527,12 @@ public void alterTable( } @Override - public List authTableQuery(Identifier identifier, @Nullable List select) + public Predicate authTableQuery(Identifier identifier, @Nullable List select) throws TableNotExistException { checkNotSystemTable(identifier, "authTable"); try { - return api.authTableQuery(identifier, select); + Filter filter = api.authTableQuery(identifier, select); + return FilterPredicateConverter.toPredicate(filter); } catch (NoSuchResourceException e) { throw new TableNotExistException(identifier); } catch (ForbiddenException e) { diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/filter/FilterPredicateConverter.java b/paimon-core/src/main/java/org/apache/paimon/rest/filter/FilterPredicateConverter.java new file mode 100644 index 000000000000..e4213b3d3fa7 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/rest/filter/FilterPredicateConverter.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.filter; + +import org.apache.paimon.predicate.And; +import org.apache.paimon.predicate.CompoundPredicate; +import org.apache.paimon.predicate.Contains; +import org.apache.paimon.predicate.EndsWith; +import org.apache.paimon.predicate.Equal; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.FieldTransform; +import org.apache.paimon.predicate.GreaterOrEqual; +import org.apache.paimon.predicate.GreaterThan; +import org.apache.paimon.predicate.In; +import org.apache.paimon.predicate.IsNotNull; +import org.apache.paimon.predicate.IsNull; +import org.apache.paimon.predicate.LeafFunction; +import org.apache.paimon.predicate.LessOrEqual; +import org.apache.paimon.predicate.LessThan; +import org.apache.paimon.predicate.Like; +import org.apache.paimon.predicate.NotEqual; +import org.apache.paimon.predicate.NotIn; +import org.apache.paimon.predicate.Or; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.predicate.StartsWith; +import org.apache.paimon.predicate.Transform; +import org.apache.paimon.predicate.TransformPredicate; +import org.apache.paimon.types.DataType; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** Converter from REST {@link Filter} to Paimon {@link Predicate}. */ +public class FilterPredicateConverter { + + private FilterPredicateConverter() {} + + @Nullable + public static Predicate toPredicate(@Nullable Filter filter) { + if (filter == null) { + return null; + } + + if (filter instanceof CompoundFilter) { + CompoundFilter cp = (CompoundFilter) filter; + List children = + cp.children() == null + ? new ArrayList<>() + : cp.children().stream() + .map(FilterPredicateConverter::toPredicate) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + CompoundPredicate.Function fn = + cp.function() == CompoundFilterFunction.OR ? Or.INSTANCE : And.INSTANCE; + return new CompoundPredicate(fn, children); + } + + if (filter instanceof TransformFilter) { + TransformFilter tp = (TransformFilter) filter; + Transform transform = toTransform(tp.transform()); + LeafFunction fn = toLeafFunction(tp.function()); + List literals = tp.literals() == null ? new ArrayList<>() : tp.literals(); + List converted = new ArrayList<>(literals.size()); + DataType literalType = transform.outputType(); + for (Object o : literals) { + converted.add(PredicateBuilder.convertJavaObject(literalType, o)); + } + return TransformPredicate.of(transform, fn, converted); + } + + throw new IllegalArgumentException( + "Unsupported filter type: " + filter.getClass().getName()); + } + + private static Transform toTransform(FilterTransform transform) { + if (transform instanceof FieldFilterTransform) { + FieldFilterTransform ft = (FieldFilterTransform) transform; + return new FieldTransform(new FieldRef(ft.index(), ft.name(), ft.dataType())); + } + throw new IllegalArgumentException( + "Unsupported transform type: " + transform.getClass().getName()); + } + + private static LeafFunction toLeafFunction(LeafFilterFunction function) { + switch (function) { + case EQUAL: + return Equal.INSTANCE; + case NOT_EQUAL: + return NotEqual.INSTANCE; + case GREATER_THAN: + return GreaterThan.INSTANCE; + case GREATER_OR_EQUAL: + return GreaterOrEqual.INSTANCE; + case LESS_THAN: + return LessThan.INSTANCE; + case LESS_OR_EQUAL: + return LessOrEqual.INSTANCE; + case IN: + return In.INSTANCE; + case NOT_IN: + return NotIn.INSTANCE; + case IS_NULL: + return IsNull.INSTANCE; + case IS_NOT_NULL: + return IsNotNull.INSTANCE; + case STARTS_WITH: + return StartsWith.INSTANCE; + case ENDS_WITH: + return EndsWith.INSTANCE; + case CONTAINS: + return Contains.INSTANCE; + case LIKE: + return Like.INSTANCE; + default: + throw new IllegalArgumentException("Unsupported leaf function: " + function); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java index 0f61f8fa0978..06e6db765a23 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java @@ -37,9 +37,10 @@ import javax.annotation.Nullable; import java.io.Serializable; -import java.util.Collections; import java.util.Optional; +import static org.apache.paimon.utils.Preconditions.checkNotNull; + /** Catalog environment in table which contains log factory, metastore client factory. */ public class CatalogEnvironment implements Serializable { @@ -154,10 +155,11 @@ public CatalogEnvironment copy(Identifier identifier) { public TableQueryAuth tableQueryAuth(CoreOptions options) { if (!options.queryAuthEnabled() || catalogLoader == null) { - return select -> Collections.emptyList(); + return select -> null; } + final CatalogLoader loader = checkNotNull(catalogLoader); return select -> { - try (Catalog catalog = catalogLoader.load()) { + try (Catalog catalog = loader.load()) { return catalog.authTableQuery(identifier, select); } catch (Exception e) { throw new RuntimeException(e); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java index dcadfad1ff3a..47f28b052ace 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java @@ -169,8 +169,10 @@ protected void authQuery() { if (!options.queryAuthEnabled()) { return; } - queryAuth.auth(readType == null ? null : readType.getFieldNames()); - // TODO add support for row level access control + Predicate rowFilter = queryAuth.auth(readType == null ? null : readType.getFieldNames()); + if (rowFilter != null) { + snapshotReader.withFilter(rowFilter); + } } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java index c81dfd8e01dd..cdac665c9c30 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java @@ -26,6 +26,7 @@ import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.predicate.TopN; import org.apache.paimon.predicate.VectorSearch; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.InnerTable; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Filter; @@ -243,7 +244,25 @@ private InnerTableScan configureScan(InnerTableScan scan) { @Override public TableRead newRead() { - InnerTableRead read = table.newRead().withFilter(filter); + Predicate readFilter = filter; + if (table instanceof FileStoreTable) { + CoreOptions options = new CoreOptions(table.options()); + if (options.queryAuthEnabled()) { + TableQueryAuth queryAuth = + ((FileStoreTable) table).catalogEnvironment().tableQueryAuth(options); + Predicate rowFilter = + queryAuth.auth(readType == null ? null : readType.getFieldNames()); + if (rowFilter != null) { + readFilter = + readFilter == null + ? rowFilter + : PredicateBuilder.and(readFilter, rowFilter); + } + } + } + + InnerTableRead read = table.newRead().withFilter(readFilter); + read.executeFilter(); if (readType != null) { read.withReadType(readType); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/TableQueryAuth.java b/paimon-core/src/main/java/org/apache/paimon/table/source/TableQueryAuth.java index 96a0dfb3a591..61b03e29e313 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/TableQueryAuth.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TableQueryAuth.java @@ -18,6 +18,8 @@ package org.apache.paimon.table.source; +import org.apache.paimon.predicate.Predicate; + import javax.annotation.Nullable; import java.util.List; @@ -25,5 +27,12 @@ /** Table query auth. */ public interface TableQueryAuth { - List auth(@Nullable List select); + /** + * Authorize table query and return a row-level access control predicate. + * + * @param select select columns, null if select all + * @return row-level access control predicate, null if no additional filter + */ + @Nullable + Predicate auth(@Nullable List select); } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java index 150a8e28eb96..0ed43302f58a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java @@ -35,8 +35,13 @@ import org.apache.paimon.rest.auth.DLFTokenLoaderFactory; import org.apache.paimon.rest.auth.RESTAuthParameter; import org.apache.paimon.rest.exceptions.NotAuthorizedException; +import org.apache.paimon.rest.filter.FieldFilterTransform; +import org.apache.paimon.rest.filter.LeafFilterFunction; +import org.apache.paimon.rest.filter.TransformFilter; import org.apache.paimon.rest.responses.ConfigResponse; import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.Table; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; @@ -47,14 +52,17 @@ import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import static org.apache.paimon.CoreOptions.QUERY_AUTH_ENABLED; import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX; import static org.apache.paimon.rest.RESTApi.HEADER_PREFIX; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -246,6 +254,34 @@ void testCreateFormatTableWhenEnableDataToken() throws Exception { catalog.dropTable(identifier, true); } + @Test + void testRowFilter() throws Exception { + Identifier identifier = Identifier.create("test_table_db", "auth_table_filter"); + catalog.createDatabase(identifier.getDatabaseName(), true); + catalog.createTable( + identifier, + new Schema( + Collections.singletonList(new DataField(0, "col1", DataTypes.INT())), + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonMap(QUERY_AUTH_ENABLED.key(), "true"), + ""), + true); + + Table table = catalog.getTable(identifier); + batchWrite(table, Arrays.asList(1, 2, 3, 4)); + + // Only allow rows with col1 > 2 + restCatalogServer.addTableFilter( + identifier, + new TransformFilter( + new FieldFilterTransform(0, "col1", DataTypes.INT()), + LeafFilterFunction.GREATER_THAN, + java.util.Collections.singletonList(2))); + + assertThat(batchRead(table)).containsExactly("+I[3]", "+I[4]"); + } + private void checkHeader(String headerName, String headerValue) { // Verify that the header were included in the requests List> receivedHeaders = restCatalogServer.getReceivedHeaders(); diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTApiJsonTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTApiJsonTest.java index 10aea7f3b40e..324997ca3652 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTApiJsonTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTApiJsonTest.java @@ -18,6 +18,12 @@ package org.apache.paimon.rest; +import org.apache.paimon.rest.filter.CompoundFilter; +import org.apache.paimon.rest.filter.CompoundFilterFunction; +import org.apache.paimon.rest.filter.FieldFilterTransform; +import org.apache.paimon.rest.filter.Filter; +import org.apache.paimon.rest.filter.LeafFilterFunction; +import org.apache.paimon.rest.filter.TransformFilter; import org.apache.paimon.rest.requests.AlterDatabaseRequest; import org.apache.paimon.rest.requests.AlterFunctionRequest; import org.apache.paimon.rest.requests.AlterTableRequest; @@ -29,6 +35,7 @@ import org.apache.paimon.rest.requests.RenameTableRequest; import org.apache.paimon.rest.requests.RollbackTableRequest; import org.apache.paimon.rest.responses.AlterDatabaseResponse; +import org.apache.paimon.rest.responses.AuthTableQueryResponse; import org.apache.paimon.rest.responses.ConfigResponse; import org.apache.paimon.rest.responses.ErrorResponse; import org.apache.paimon.rest.responses.GetDatabaseResponse; @@ -49,6 +56,7 @@ import org.junit.Test; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -298,4 +306,46 @@ public void alterFunctionRequestParseTest() throws JsonProcessingException { AlterFunctionRequest parseData = RESTApi.fromJson(requestStr, AlterFunctionRequest.class); assertEquals(parseData.changes().size(), request.changes().size()); } + + @Test + public void authTableQueryResponseWithPredicateParseTest() throws Exception { + AuthTableQueryResponse response = + new AuthTableQueryResponse( + new TransformFilter( + new FieldFilterTransform(0, "a", DataTypes.INT()), + LeafFilterFunction.EQUAL, + java.util.Collections.singletonList(1))); + String responseStr = RESTApi.toJson(response); + AuthTableQueryResponse parseData = + RESTApi.fromJson(responseStr, AuthTableQueryResponse.class); + assertTrue(parseData.filter() instanceof TransformFilter); + } + + @Test + public void authTableQueryResponseWithCompoundPredicateParseTest() throws Exception { + Filter p1 = + new TransformFilter( + new FieldFilterTransform(0, "a", DataTypes.INT()), + LeafFilterFunction.GREATER_THAN, + java.util.Collections.singletonList(10)); + Filter p2 = + new TransformFilter( + new FieldFilterTransform(1, "b", DataTypes.STRING()), + LeafFilterFunction.STARTS_WITH, + java.util.Collections.singletonList("x")); + AuthTableQueryResponse response = + new AuthTableQueryResponse( + new CompoundFilter(CompoundFilterFunction.OR, Arrays.asList(p1, p2))); + + String responseStr = RESTApi.toJson(response); + AuthTableQueryResponse parseData = + RESTApi.fromJson(responseStr, AuthTableQueryResponse.class); + + assertTrue(parseData.filter() instanceof CompoundFilter); + CompoundFilter cp = (CompoundFilter) parseData.filter(); + assertEquals(CompoundFilterFunction.OR, cp.function()); + assertEquals(2, cp.children().size()); + assertTrue(cp.children().get(0) instanceof TransformFilter); + assertTrue(cp.children().get(1) instanceof TransformFilter); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java index f4cad6e47d8a..ff9e47a08e56 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java @@ -43,6 +43,7 @@ import org.apache.paimon.partition.PartitionUtils; import org.apache.paimon.rest.auth.AuthProvider; import org.apache.paimon.rest.auth.RESTAuthParameter; +import org.apache.paimon.rest.filter.Filter; import org.apache.paimon.rest.requests.AlterDatabaseRequest; import org.apache.paimon.rest.requests.AlterFunctionRequest; import org.apache.paimon.rest.requests.AlterTableRequest; @@ -182,6 +183,7 @@ public class RESTCatalogServer { private final List noPermissionTables = new ArrayList<>(); private final Map functionStore = new HashMap<>(); private final Map> columnAuthHandler = new HashMap<>(); + private final Map rowFilterAuthHandler = new HashMap<>(); public final ConfigResponse configResponse; public final String warehouse; @@ -263,6 +265,10 @@ public void addTableColumnAuth(Identifier identifier, List select) { columnAuthHandler.put(identifier.getFullName(), select); } + public void addTableFilter(Identifier identifier, Filter filter) { + rowFilterAuthHandler.put(identifier.getFullName(), filter); + } + public RESTToken getDataToken(Identifier identifier) { return DataTokenStore.getDataToken(warehouse, identifier.getFullName()); } @@ -825,7 +831,8 @@ private MockResponse authTable(Identifier identifier, String data) throws Except } }); } - AuthTableQueryResponse response = new AuthTableQueryResponse(Collections.emptyList()); + Filter filter = rowFilterAuthHandler.get(identifier.getFullName()); + AuthTableQueryResponse response = new AuthTableQueryResponse(filter); return mockResponse(response, 200); } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/filter/FilterConverterTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/filter/FilterConverterTest.java new file mode 100644 index 000000000000..cae41c417a3c --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/rest/filter/FilterConverterTest.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.filter; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.types.DataTypes; + +import org.junit.Test; + +import java.math.BigDecimal; +import java.time.LocalDate; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Tests for {@link FilterPredicateConverter}. */ +public class FilterConverterTest { + + @Test + public void testTransformPredicateConversion() { + TransformFilter rest = + new TransformFilter( + new FieldFilterTransform(0, "a", DataTypes.INT()), + LeafFilterFunction.EQUAL, + Collections.singletonList(5)); + + Predicate predicate = FilterPredicateConverter.toPredicate(rest); + assertNotNull(predicate); + + assertTrue(predicate.test(GenericRow.of(5))); + assertFalse(predicate.test(GenericRow.of(6))); + } + + @Test + public void testCompoundPredicateConversion() { + Filter p1 = + new TransformFilter( + new FieldFilterTransform(0, "a", DataTypes.INT()), + LeafFilterFunction.EQUAL, + Collections.singletonList(5)); + Filter p2 = + new TransformFilter( + new FieldFilterTransform(1, "b", DataTypes.STRING()), + LeafFilterFunction.EQUAL, + Collections.singletonList("x")); + CompoundFilter rest = new CompoundFilter(CompoundFilterFunction.AND, Arrays.asList(p1, p2)); + + Predicate predicate = FilterPredicateConverter.toPredicate(rest); + assertNotNull(predicate); + + assertTrue(predicate.test(GenericRow.of(5, BinaryString.fromString("x")))); + assertFalse(predicate.test(GenericRow.of(5, BinaryString.fromString("y")))); + assertFalse(predicate.test(GenericRow.of(6, BinaryString.fromString("x")))); + } + + @Test + public void testOrCompoundPredicateConversion() { + Filter p1 = + new TransformFilter( + new FieldFilterTransform(0, "a", DataTypes.INT()), + LeafFilterFunction.EQUAL, + Collections.singletonList(5)); + Filter p2 = + new TransformFilter( + new FieldFilterTransform(0, "a", DataTypes.INT()), + LeafFilterFunction.EQUAL, + Collections.singletonList(6)); + CompoundFilter rest = new CompoundFilter(CompoundFilterFunction.OR, Arrays.asList(p1, p2)); + + Predicate predicate = FilterPredicateConverter.toPredicate(rest); + assertNotNull(predicate); + + assertTrue(predicate.test(GenericRow.of(5))); + assertTrue(predicate.test(GenericRow.of(6))); + assertFalse(predicate.test(GenericRow.of(7))); + } + + @Test + public void testInFunctionConversion() { + TransformFilter rest = + new TransformFilter( + new FieldFilterTransform(0, "a", DataTypes.INT()), + LeafFilterFunction.IN, + Arrays.asList(1, 2, 3)); + + Predicate predicate = FilterPredicateConverter.toPredicate(rest); + assertNotNull(predicate); + + assertTrue(predicate.test(GenericRow.of(2))); + assertFalse(predicate.test(GenericRow.of(5))); + } + + @Test + public void testIsNullFunctionConversion() { + TransformFilter rest = + new TransformFilter( + new FieldFilterTransform(0, "a", DataTypes.STRING()), + LeafFilterFunction.IS_NULL, + null); + + Predicate predicate = FilterPredicateConverter.toPredicate(rest); + assertNotNull(predicate); + + assertTrue(predicate.test(GenericRow.of((Object) null))); + assertFalse(predicate.test(GenericRow.of(BinaryString.fromString("x")))); + } + + @Test + public void testLiteralConversions() { + // BOOLEAN from string + Predicate boolEq = + FilterPredicateConverter.toPredicate( + new TransformFilter( + new FieldFilterTransform(0, "b", DataTypes.BOOLEAN()), + LeafFilterFunction.EQUAL, + Collections.singletonList("true"))); + assertNotNull(boolEq); + assertTrue(boolEq.test(GenericRow.of(true))); + assertFalse(boolEq.test(GenericRow.of(false))); + + // VARBINARY/BINARY from base64 string + byte[] bytes = new byte[] {1, 2, 3}; + String b64 = Base64.getEncoder().encodeToString(bytes); + Predicate bytesEq = + FilterPredicateConverter.toPredicate( + new TransformFilter( + new FieldFilterTransform(0, "c", DataTypes.VARBINARY(3)), + LeafFilterFunction.EQUAL, + Collections.singletonList(b64))); + assertNotNull(bytesEq); + assertTrue(bytesEq.test(GenericRow.of(bytes))); + assertFalse(bytesEq.test(GenericRow.of(new byte[] {1, 2, 4}))); + + // DECIMAL from string + Decimal expectedDec = Decimal.fromBigDecimal(new BigDecimal("12.34"), 10, 2); + Predicate decEq = + FilterPredicateConverter.toPredicate( + new TransformFilter( + new FieldFilterTransform(0, "d", DataTypes.DECIMAL(10, 2)), + LeafFilterFunction.EQUAL, + Collections.singletonList("12.34"))); + assertNotNull(decEq); + assertTrue(decEq.test(GenericRow.of(expectedDec))); + assertFalse( + decEq.test(GenericRow.of(Decimal.fromBigDecimal(new BigDecimal("12.35"), 10, 2)))); + + // DATE from ISO string -> epoch day int + LocalDate date = LocalDate.parse("2025-12-23"); + int epochDay = (int) date.toEpochDay(); + Predicate dateEq = + FilterPredicateConverter.toPredicate( + new TransformFilter( + new FieldFilterTransform(0, "e", DataTypes.DATE()), + LeafFilterFunction.EQUAL, + Collections.singletonList("2025-12-23"))); + assertNotNull(dateEq); + assertTrue(dateEq.test(GenericRow.of(epochDay))); + assertFalse(dateEq.test(GenericRow.of(epochDay + 1))); + + // TIMESTAMP from string (java.sql.Timestamp.valueOf compatible) + Timestamp ts = Timestamp.fromEpochMillis(1_700_000_000_000L); + String tsString = ts.toSQLTimestamp().toString(); + Predicate tsEq = + FilterPredicateConverter.toPredicate( + new TransformFilter( + new FieldFilterTransform(0, "f", DataTypes.TIMESTAMP_MILLIS()), + LeafFilterFunction.EQUAL, + Collections.singletonList(tsString))); + assertNotNull(tsEq); + assertTrue(tsEq.test(GenericRow.of(ts))); + assertFalse(tsEq.test(GenericRow.of(Timestamp.fromEpochMillis(ts.getMillisecond() + 1)))); + } + + @Test + public void testDecimalOverflowThrows() { + TransformFilter rest = + new TransformFilter( + new FieldFilterTransform(0, "d", DataTypes.DECIMAL(2, 0)), + LeafFilterFunction.EQUAL, + Collections.singletonList("123")); + + assertThrows( + IllegalArgumentException.class, () -> FilterPredicateConverter.toPredicate(rest)); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java index d79aa713c9dd..622088255978 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java @@ -20,6 +20,10 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.rest.RESTToken; +import org.apache.paimon.rest.filter.FieldFilterTransform; +import org.apache.paimon.rest.filter.LeafFilterFunction; +import org.apache.paimon.rest.filter.TransformFilter; +import org.apache.paimon.types.DataTypes; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; @@ -34,6 +38,7 @@ import org.apache.flink.types.Row; import org.junit.jupiter.api.Test; +import java.util.Collections; import java.util.List; import java.util.UUID; @@ -81,6 +86,30 @@ public void testWriteAndRead() { .containsExactlyInAnyOrder(Row.of("1", 11.0D), Row.of("2", 22.0D)); } + @Test + public void testRowFilter() { + String rowFilterTable = "row_filter_table"; + batchSql( + String.format( + "CREATE TABLE %s.%s (col1 INT) WITH ('query-auth.enabled' = 'true')", + DATABASE_NAME, rowFilterTable)); + batchSql( + String.format( + "INSERT INTO %s.%s VALUES (1), (2), (3), (4)", + DATABASE_NAME, rowFilterTable)); + + // Only allow rows with col1 > 2 + restCatalogServer.addTableFilter( + Identifier.create(DATABASE_NAME, rowFilterTable), + new TransformFilter( + new FieldFilterTransform(0, "col1", DataTypes.INT()), + LeafFilterFunction.GREATER_THAN, + Collections.singletonList(2))); + + assertThat(batchSql(String.format("SELECT col1 FROM %s.%s", DATABASE_NAME, rowFilterTable))) + .containsExactlyInAnyOrder(Row.of(3), Row.of(4)); + } + @Test public void testExpiredDataToken() { Identifier identifier = Identifier.create(DATABASE_NAME, TABLE_NAME); diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java index ee8978c68767..fc1396d4c33a 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java @@ -30,6 +30,9 @@ import org.apache.paimon.rest.auth.AuthProvider; import org.apache.paimon.rest.auth.AuthProviderEnum; import org.apache.paimon.rest.auth.BearTokenAuthProvider; +import org.apache.paimon.rest.filter.FieldFilterTransform; +import org.apache.paimon.rest.filter.LeafFilterFunction; +import org.apache.paimon.rest.filter.TransformFilter; import org.apache.paimon.rest.responses.ConfigResponse; import org.apache.paimon.spark.catalog.WithPaimonCatalog; import org.apache.paimon.types.DataField; @@ -48,6 +51,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.UUID; @@ -237,6 +241,25 @@ public void testMapFunction() throws Exception { cleanFunction(functionName); } + @Test + public void testRowFilter() { + spark.sql( + "CREATE TABLE t_row_filter (col1 INT) TBLPROPERTIES" + + " ('bucket'='1', 'bucket-key'='col1', 'file.format'='avro', 'query-auth.enabled'='true')"); + spark.sql("INSERT INTO t_row_filter VALUES (1), (2), (3), (4)"); + + // Only allow rows with col1 > 2 + restCatalogServer.addTableFilter( + Identifier.create("db2", "t_row_filter"), + new TransformFilter( + new FieldFilterTransform(0, "col1", DataTypes.INT()), + LeafFilterFunction.GREATER_THAN, + Collections.singletonList(2))); + + assertThat(spark.sql("SELECT col1 FROM t_row_filter").collectAsList().toString()) + .isEqualTo("[[3], [4]]"); + } + private Catalog getPaimonCatalog() { CatalogManager catalogManager = spark.sessionState().catalogManager(); WithPaimonCatalog withPaimonCatalog = (WithPaimonCatalog) catalogManager.currentCatalog(); From 867d1b5c04dea83b7ad87ef0299bf21b47ccd6ac Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Thu, 25 Dec 2025 13:12:15 +0800 Subject: [PATCH 2/2] fix FilterConverterTest --- .../rest/filter/FilterConverterTest.java | 52 +++++-------------- 1 file changed, 14 insertions(+), 38 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/filter/FilterConverterTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/filter/FilterConverterTest.java index cae41c417a3c..796f2c70aeec 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/filter/FilterConverterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/filter/FilterConverterTest.java @@ -29,13 +29,12 @@ import java.math.BigDecimal; import java.time.LocalDate; +import java.time.LocalDateTime; import java.util.Arrays; -import java.util.Base64; import java.util.Collections; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; /** Tests for {@link FilterPredicateConverter}. */ @@ -132,44 +131,31 @@ public void testIsNullFunctionConversion() { @Test public void testLiteralConversions() { - // BOOLEAN from string + // BOOLEAN from boolean Predicate boolEq = FilterPredicateConverter.toPredicate( new TransformFilter( new FieldFilterTransform(0, "b", DataTypes.BOOLEAN()), LeafFilterFunction.EQUAL, - Collections.singletonList("true"))); + Collections.singletonList(true))); assertNotNull(boolEq); assertTrue(boolEq.test(GenericRow.of(true))); assertFalse(boolEq.test(GenericRow.of(false))); - // VARBINARY/BINARY from base64 string - byte[] bytes = new byte[] {1, 2, 3}; - String b64 = Base64.getEncoder().encodeToString(bytes); - Predicate bytesEq = - FilterPredicateConverter.toPredicate( - new TransformFilter( - new FieldFilterTransform(0, "c", DataTypes.VARBINARY(3)), - LeafFilterFunction.EQUAL, - Collections.singletonList(b64))); - assertNotNull(bytesEq); - assertTrue(bytesEq.test(GenericRow.of(bytes))); - assertFalse(bytesEq.test(GenericRow.of(new byte[] {1, 2, 4}))); - - // DECIMAL from string + // DECIMAL from BigDecimal Decimal expectedDec = Decimal.fromBigDecimal(new BigDecimal("12.34"), 10, 2); Predicate decEq = FilterPredicateConverter.toPredicate( new TransformFilter( new FieldFilterTransform(0, "d", DataTypes.DECIMAL(10, 2)), LeafFilterFunction.EQUAL, - Collections.singletonList("12.34"))); + Collections.singletonList(new BigDecimal("12.34")))); assertNotNull(decEq); assertTrue(decEq.test(GenericRow.of(expectedDec))); assertFalse( decEq.test(GenericRow.of(Decimal.fromBigDecimal(new BigDecimal("12.35"), 10, 2)))); - // DATE from ISO string -> epoch day int + // DATE from LocalDate -> epoch day int LocalDate date = LocalDate.parse("2025-12-23"); int epochDay = (int) date.toEpochDay(); Predicate dateEq = @@ -177,34 +163,24 @@ public void testLiteralConversions() { new TransformFilter( new FieldFilterTransform(0, "e", DataTypes.DATE()), LeafFilterFunction.EQUAL, - Collections.singletonList("2025-12-23"))); + Collections.singletonList(date))); assertNotNull(dateEq); assertTrue(dateEq.test(GenericRow.of(epochDay))); assertFalse(dateEq.test(GenericRow.of(epochDay + 1))); - // TIMESTAMP from string (java.sql.Timestamp.valueOf compatible) - Timestamp ts = Timestamp.fromEpochMillis(1_700_000_000_000L); - String tsString = ts.toSQLTimestamp().toString(); + // TIMESTAMP from LocalDateTime + LocalDateTime dateTime = LocalDateTime.of(2025, 12, 23, 12, 34, 56, 789_000_000); + Timestamp ts = Timestamp.fromLocalDateTime(dateTime); Predicate tsEq = FilterPredicateConverter.toPredicate( new TransformFilter( new FieldFilterTransform(0, "f", DataTypes.TIMESTAMP_MILLIS()), LeafFilterFunction.EQUAL, - Collections.singletonList(tsString))); + Collections.singletonList(dateTime))); assertNotNull(tsEq); assertTrue(tsEq.test(GenericRow.of(ts))); - assertFalse(tsEq.test(GenericRow.of(Timestamp.fromEpochMillis(ts.getMillisecond() + 1)))); - } - - @Test - public void testDecimalOverflowThrows() { - TransformFilter rest = - new TransformFilter( - new FieldFilterTransform(0, "d", DataTypes.DECIMAL(2, 0)), - LeafFilterFunction.EQUAL, - Collections.singletonList("123")); - - assertThrows( - IllegalArgumentException.class, () -> FilterPredicateConverter.toPredicate(rest)); + assertFalse( + tsEq.test( + GenericRow.of(Timestamp.fromLocalDateTime(dateTime.plusNanos(1_000_000))))); } }