Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.functions.sql;

import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;

/**
* Function which is not bridged by {@link BridgingSqlFunction} and which is not legacy {@link
* TableFunction} however whose {@link FunctionDefinition} might be queried.
*/
public interface FunctionDefinitionQueryable {
FunctionDefinition getFunctionDefinition();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.flink.table.planner.functions.sql;

import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlOperatorBinding;
import org.apache.calcite.sql.fun.SqlJsonArrayFunction;
Expand All @@ -29,7 +32,8 @@
* This class is a wrapper class for the {@link SqlJsonArrayFunction} but using the {@code
* VARCHAR_NOT_NULL} return type inference.
*/
class SqlJsonArrayFunctionWrapper extends SqlJsonArrayFunction {
class SqlJsonArrayFunctionWrapper extends SqlJsonArrayFunction
implements FunctionDefinitionQueryable {

@Override
public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
Expand All @@ -49,4 +53,9 @@ public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
public SqlReturnTypeInference getReturnTypeInference() {
return VARCHAR_NOT_NULL;
}

@Override
public FunctionDefinition getFunctionDefinition() {
return BuiltInFunctionDefinitions.JSON_ARRAY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.flink.table.planner.functions.sql;

import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlOperatorBinding;
import org.apache.calcite.sql.fun.SqlJsonObjectFunction;
Expand All @@ -29,7 +32,8 @@
* This class is a wrapper class for the {@link SqlJsonObjectFunction} but using the {@code
* VARCHAR_NOT_NULL} return type inference.
*/
class SqlJsonObjectFunctionWrapper extends SqlJsonObjectFunction {
class SqlJsonObjectFunctionWrapper extends SqlJsonObjectFunction
implements FunctionDefinitionQueryable {

@Override
public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
Expand All @@ -49,4 +53,9 @@ public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
public SqlReturnTypeInference getReturnTypeInference() {
return VARCHAR_NOT_NULL;
}

@Override
public FunctionDefinition getFunctionDefinition() {
return BuiltInFunctionDefinitions.JSON_OBJECT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.table.planner.functions.sql;

import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlCallBinding;
Expand All @@ -42,7 +44,8 @@
* This class is a wrapper class for the {@link SqlJsonQueryFunction} but using the {@code
* VARCHAR_FORCE_NULLABLE} return type inference.
*/
class SqlJsonQueryFunctionWrapper extends SqlJsonQueryFunction {
class SqlJsonQueryFunctionWrapper extends SqlJsonQueryFunction
implements FunctionDefinitionQueryable {
private final SqlReturnTypeInference returnTypeInference;

SqlJsonQueryFunctionWrapper() {
Expand Down Expand Up @@ -142,4 +145,9 @@ private static RelDataType explicitTypeSpec(SqlOperatorBinding opBinding) {
}
return null;
}

@Override
public FunctionDefinition getFunctionDefinition() {
return BuiltInFunctionDefinitions.JSON_QUERY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.flink.table.planner.functions.sql;

import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlJsonValueReturning;
import org.apache.calcite.sql.SqlOperatorBinding;
Expand All @@ -35,7 +38,8 @@
* VARCHAR_FORCE_NULLABLE} return type inference by default. It also supports specifying return type
* with the RETURNING keyword just like the original {@link SqlJsonValueFunction}.
*/
class SqlJsonValueFunctionWrapper extends SqlJsonValueFunction {
class SqlJsonValueFunctionWrapper extends SqlJsonValueFunction
implements FunctionDefinitionQueryable {

private final SqlReturnTypeInference returnTypeInference;

Expand Down Expand Up @@ -80,4 +84,9 @@ private static RelDataType explicitTypeSpec(SqlOperatorBinding opBinding) {
}
return null;
}

@Override
public FunctionDefinition getFunctionDefinition() {
return BuiltInFunctionDefinitions.JSON_VALUE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -99,10 +100,11 @@ protected Transformation<RowData> translateToPlanInternal(
final CodeGenOperatorFactory<RowData> substituteStreamOperator =
CalcCodeGenerator.generateCalcOperator(
ctx,
inputTransform,
(RowType) inputEdge.getOutputType(),
(RowType) getOutputType(),
JavaScalaConversionUtil.toScala(projection),
JavaScalaConversionUtil.toScala(Optional.ofNullable(this.condition)),
ShortcutUtils.unwrapTypeFactory(planner),
retainHeader,
getClass().getSimpleName());
return ExecNodeUtil.createOneInputTransformation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,8 @@ protected StreamOperatorFactory<RowData> createAsyncLookupJoin(
JavaScalaConversionUtil.toScala(projectionOnTemporalTable),
filterOnTemporalTable,
projectionOutputRelDataType,
tableSourceRowType);
tableSourceRowType,
ShortcutUtils.unwrapTypeFactory(relBuilder));
asyncFunc =
new AsyncLookupJoinWithCalcRunner(
generatedFuncWithType.tableFunc(),
Expand Down Expand Up @@ -647,7 +648,8 @@ protected ProcessFunction<RowData, RowData> createSyncLookupJoinFunction(
JavaScalaConversionUtil.toScala(projectionOnTemporalTable),
filterOnTemporalTable,
projectionOutputRelDataType,
tableSourceRowType);
tableSourceRowType,
ShortcutUtils.unwrapTypeFactory(relBuilder));

processFunc =
new LookupJoinWithCalcRunner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ private DeltaJoinRuntimeTree.Node convert2RuntimeTreeInternal(
node.filter,
rowTypePassThroughCalc,
rowTypeBeforeCalc,
generatedCalcName))
generatedCalcName,
typeFactory))
.orElse(null);

if (node instanceof BinaryInputNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,8 @@ private static LookupHandlerBase generateLookupHandler(
JavaScalaConversionUtil.toScala(projectionOnTemporalTable),
filterOnTemporalTable,
lookupSidePassThroughCalcRowType,
lookupTableSourceRowType);
lookupTableSourceRowType,
typeFactory);
}

Preconditions.checkState(!generatedFetcherCollector.containsKey(lookupTableOrdinal));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.expressions.RexNodeExpression;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
import org.apache.flink.table.planner.functions.sql.FunctionDefinitionQueryable;
import org.apache.flink.table.planner.functions.utils.TableSqlFunction;

import org.apache.calcite.plan.Context;
Expand All @@ -40,13 +41,20 @@
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexFieldAccess;
import org.apache.calcite.rex.RexLocalRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlOperatorBinding;
import org.apache.calcite.tools.RelBuilder;

import javax.annotation.Nullable;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* Utilities for quick access of commonly used instances (like {@link FlinkTypeFactory}) without
* long chains of getters or casting like {@code (FlinkTypeFactory)
Expand Down Expand Up @@ -147,14 +155,18 @@ public static DataTypeFactory unwrapDataTypeFactory(RelBuilder relBuilder) {
return null;
}
final RexCall call = (RexCall) rexNode;
if (!(call.getOperator() instanceof BridgingSqlFunction)) {
final SqlOperator operator = call.getOperator();
if (!(operator instanceof BridgingSqlFunction)) {
if (operator instanceof FunctionDefinitionQueryable) {
return ((FunctionDefinitionQueryable) operator).getFunctionDefinition();
}
// legacy
if (call.getOperator() instanceof TableSqlFunction) {
return ((TableSqlFunction) call.getOperator()).udtf();
if (operator instanceof TableSqlFunction) {
return ((TableSqlFunction) operator).udtf();
}
return null;
}
return ((BridgingSqlFunction) call.getOperator()).getDefinition();
return ((BridgingSqlFunction) operator).getDefinition();
}

public static @Nullable FunctionDefinition unwrapFunctionDefinition(SqlOperator operator) {
Expand All @@ -169,6 +181,56 @@ public static boolean isFunctionKind(SqlOperator operator, FunctionKind kind) {
return functionDefinition != null && functionDefinition.getKind() == kind;
}

public static boolean isOneOfFunctionDefinitions(
RexNode rexNode, FunctionDefinition... expectedDefinitions) {
if (!(rexNode instanceof RexCall)) {
return false;
}
final RexCall call = (RexCall) rexNode;
final FunctionDefinition unwrapped = unwrapFunctionDefinition(call);
for (FunctionDefinition expected : expectedDefinitions) {
if (unwrapped == expected) {
return true;
}
}
return false;
}

public static boolean isDeterministicThroughProgram(
RexNode node, @Nullable List<RexNode> exprs) {
if (exprs == null) {
return RexUtil.isDeterministic(node);
}
return isDeterministicThroughProgram(node, exprs, new HashSet<>());
}

private static boolean isDeterministicThroughProgram(
RexNode node, List<RexNode> exprs, Set<Integer> visited) {
if (node instanceof RexCall) {
final RexCall call = (RexCall) node;
if (!call.getOperator().isDeterministic()) {
return false;
}
for (RexNode operand : call.getOperands()) {
if (!isDeterministicThroughProgram(operand, exprs, visited)) {
return false;
}
}
return true;
}
if (node instanceof RexLocalRef) {
final int idx = ((RexLocalRef) node).getIndex();
// already on the stack: skip rather than recurse forever
return !visited.add(idx)
|| isDeterministicThroughProgram(exprs.get(idx), exprs, visited);
}
if (node instanceof RexFieldAccess) {
return isDeterministicThroughProgram(
((RexFieldAccess) node).getReferenceExpr(), exprs, visited);
}
return true;
}

public static @Nullable BridgingSqlFunction unwrapBridgingSqlFunction(RexCall call) {
final SqlOperator operator = call.getOperator();
if (operator instanceof BridgingSqlFunction) {
Expand Down
Loading