Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 44 additions & 116 deletions flink-table/flink-sql-client/src/test/resources/sql/table.q

Large diffs are not rendered by default.

96 changes: 14 additions & 82 deletions flink-table/flink-sql-gateway/src/test/resources/sql/table.q
Original file line number Diff line number Diff line change
Expand Up @@ -682,15 +682,11 @@ LogicalProject(user=[$0], product=[$1])

== Optimized Physical Plan ==
Calc(select=[user, product])
+- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+- Calc(select=[user, product, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])

== Optimized Execution Plan ==
Calc(select=[user, product])
+- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+- Calc(select=[user, product, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
!ok

# test explain plan for insert
Expand Down Expand Up @@ -725,15 +721,11 @@ LogicalProject(user=[$0], product=[$1])

== Optimized Physical Plan ==
Calc(select=[user, product])
+- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+- Calc(select=[user, product, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])

== Optimized Execution Plan ==
Calc(select=[user, product])
+- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+- Calc(select=[user, product, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
!ok

# test explain insert
Expand Down Expand Up @@ -844,15 +836,11 @@ LogicalProject(user=[$0], product=[$1])

== Optimized Physical Plan ==
Calc(select=[user, product])
+- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+- Calc(select=[user, product, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])

== Optimized Execution Plan ==
Calc(select=[user, product])
+- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+- Calc(select=[user, product, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])

== Physical Execution Plan ==
{
Expand All @@ -862,28 +850,6 @@ Calc(select=[user, product])
"pact" : "Data Source",
"contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])",
"parallelism" : 1
}, {
"id" : ,
"type" : "Calc[]",
"pact" : "Operator",
"contents" : "[]:Calc(select=[user, product, ts])",
"parallelism" : 1,
"predecessors" : [ {
"id" : ,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : ,
"type" : "WatermarkAssigner[]",
"pact" : "Operator",
"contents" : "[]:WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])",
"parallelism" : 1,
"predecessors" : [ {
"id" : ,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : ,
"type" : "Calc[]",
Expand All @@ -909,16 +875,12 @@ LogicalProject(user=[$0], product=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, orders]])

== Optimized Physical Plan ==
Calc(select=[user, product]): rowcount = 1.0E8, cumulative cost = {4.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
+- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)]): rowcount = 1.0E8, cumulative cost = {3.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
+- Calc(select=[user, product, ts]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
Calc(select=[user, product]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}

== Optimized Execution Plan ==
Calc(select=[user, product])
+- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+- Calc(select=[user, product, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
!ok

# test explain select with CHANGELOG_MODE
Expand All @@ -932,15 +894,11 @@ LogicalProject(user=[$0], product=[$1])

== Optimized Physical Plan ==
Calc(select=[user, product], changelogMode=[I])
+- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[I])
+- Calc(select=[user, product, ts], changelogMode=[I])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts], changelogMode=[I])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts], changelogMode=[I])

== Optimized Execution Plan ==
Calc(select=[user, product])
+- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+- Calc(select=[user, product, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
!ok

# test explain select with all details
Expand All @@ -953,16 +911,12 @@ LogicalProject(user=[$0], product=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, orders]])

== Optimized Physical Plan ==
Calc(select=[user, product], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {4.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
+- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {3.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
+- Calc(select=[user, product, ts], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
Calc(select=[user, product], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}

== Optimized Execution Plan ==
Calc(select=[user, product])
+- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+- Calc(select=[user, product, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])

== Physical Execution Plan ==
{
Expand All @@ -972,28 +926,6 @@ Calc(select=[user, product])
"pact" : "Data Source",
"contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])",
"parallelism" : 1
}, {
"id" : ,
"type" : "Calc[]",
"pact" : "Operator",
"contents" : "[]:Calc(select=[user, product, ts])",
"parallelism" : 1,
"predecessors" : [ {
"id" : ,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : ,
"type" : "WatermarkAssigner[]",
"pact" : "Operator",
"contents" : "[]:WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])",
"parallelism" : 1,
"predecessors" : [ {
"id" : ,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : ,
"type" : "Calc[]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,13 @@
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLocalRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexPatternFieldRef;
import org.apache.calcite.rex.RexProgram;
import org.apache.calcite.rex.RexProgramBuilder;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Pair;

Expand All @@ -100,7 +96,6 @@
import static org.apache.flink.table.planner.calcite.FlinkTypeFactory.isProctimeIndicatorType;
import static org.apache.flink.table.planner.calcite.FlinkTypeFactory.isRowtimeIndicatorType;
import static org.apache.flink.table.planner.calcite.FlinkTypeFactory.isTimeIndicatorType;
import static org.apache.flink.table.planner.plan.utils.MatchUtil.isFinalOnMatchTimeIndicator;
import static org.apache.flink.table.planner.plan.utils.WindowUtil.groupingContainsWindowStartEnd;

/**
Expand Down Expand Up @@ -189,7 +184,8 @@ public RelNode visit(LogicalTableModify modify) {

private RelNode visitMatch(FlinkLogicalMatch match) {
RelNode newInput = match.getInput().accept(this);
RexTimeIndicatorMaterializer materializer = new RexTimeIndicatorMaterializer(newInput);
RexTimeIndicatorMaterializer materializer =
new RexTimeIndicatorMaterializer(rexBuilder, newInput);

Function<Map<String, RexNode>, Map<String, RexNode>> materializeExprs =
rexNodesMap ->
Expand Down Expand Up @@ -295,7 +291,8 @@ private RelNode visitCalc(FlinkLogicalCalc calc) {
// check if input field contains time indicator type
// materialize field if no time indicator is present anymore
// if input field is already materialized, change to timestamp type
RexTimeIndicatorMaterializer materializer = new RexTimeIndicatorMaterializer(newInput);
RexTimeIndicatorMaterializer materializer =
new RexTimeIndicatorMaterializer(rexBuilder, newInput);
List<RexNode> newProjects =
program.getProjectList().stream()
.map(project -> program.expandLocalRef(project).accept(materializer))
Expand Down Expand Up @@ -379,7 +376,8 @@ private RelNode visitCorrelate(FlinkLogicalCorrelate correlate) {
// check if input field contains time indicator type
// materialize field if no time indicator is present anymore
// if input field is already materialized, change to timestamp type
RexTimeIndicatorMaterializer materializer = new RexTimeIndicatorMaterializer(newLeft);
RexTimeIndicatorMaterializer materializer =
new RexTimeIndicatorMaterializer(rexBuilder, newLeft);

RexNode newScanCall = newScan.getCall().accept(materializer);
newRight =
Expand Down Expand Up @@ -585,7 +583,8 @@ private RelNode visitMultiJoin(FlinkLogicalMultiJoin multiJoin) {
.flatMap(input -> RelOptUtil.getFieldTypeList(input.getRowType()).stream())
.collect(Collectors.toList());

RexTimeIndicatorMaterializer materializer = new RexTimeIndicatorMaterializer(allFields);
RexTimeIndicatorMaterializer materializer =
new RexTimeIndicatorMaterializer(rexBuilder, allFields);

final RexNode newJoinFilter = multiJoin.getJoinFilter().accept(materializer);

Expand Down Expand Up @@ -789,101 +788,4 @@ private RelDataType timestamp(boolean isNullable, boolean isTimestampLtzIndicato
private boolean isTimestampLtzType(RelDataType type) {
return type.getSqlTypeName().equals(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
}

// ----------------------------------------------------------------------------------------
// Materializer for RexNode including time indicator
// ----------------------------------------------------------------------------------------

private class RexTimeIndicatorMaterializer extends RexShuttle {

private final List<RelDataType> inputFieldTypes;

private RexTimeIndicatorMaterializer(RelNode node) {
this(RelOptUtil.getFieldTypeList(node.getRowType()));
}

private RexTimeIndicatorMaterializer(List<RelDataType> inputFieldTypes) {
this.inputFieldTypes = inputFieldTypes;
}

@Override
public RexNode visitCall(RexCall call) {
RexCall updatedCall = (RexCall) super.visitCall(call);

// materialize operands with time indicators
List<RexNode> materializedOperands;
SqlOperator updatedCallOp = updatedCall.getOperator();
if (updatedCallOp == FlinkSqlOperatorTable.SESSION_OLD
|| updatedCallOp == FlinkSqlOperatorTable.HOP_OLD
|| updatedCallOp == FlinkSqlOperatorTable.TUMBLE_OLD) {
// skip materialization for special operators
materializedOperands = updatedCall.getOperands();
} else {
materializedOperands =
updatedCall.getOperands().stream()
.map(RelTimeIndicatorConverter.this::materializeTimeIndicators)
.collect(Collectors.toList());
}

// All calls in MEASURES and DEFINE are wrapped with FINAL/RUNNING, therefore
// we should treat FINAL(MATCH_ROWTIME) and FINAL(MATCH_PROCTIME) as a time attribute
// extraction
if (isFinalOnMatchTimeIndicator(call)) {
return updatedCall;
} else if (isTimeIndicatorType(updatedCall.getType())) {
// do not modify window time attributes and some special operators
if (updatedCallOp == FlinkSqlOperatorTable.TUMBLE_ROWTIME
|| updatedCallOp == FlinkSqlOperatorTable.TUMBLE_PROCTIME
|| updatedCallOp == FlinkSqlOperatorTable.HOP_ROWTIME
|| updatedCallOp == FlinkSqlOperatorTable.HOP_PROCTIME
|| updatedCallOp == FlinkSqlOperatorTable.SESSION_ROWTIME
|| updatedCallOp == FlinkSqlOperatorTable.SESSION_PROCTIME
|| updatedCallOp == FlinkSqlOperatorTable.MATCH_ROWTIME
|| updatedCallOp == FlinkSqlOperatorTable.MATCH_PROCTIME
|| updatedCallOp == FlinkSqlOperatorTable.PROCTIME
|| updatedCallOp == SqlStdOperatorTable.AS
|| updatedCallOp == SqlStdOperatorTable.CAST
|| updatedCallOp == FlinkSqlOperatorTable.REINTERPRET) {
return updatedCall;
} else {
// materialize function's result and operands
return updatedCall.clone(
timestamp(
updatedCall.getType().isNullable(),
isTimestampLtzType(updatedCall.getType())),
materializedOperands);
}
} else {
// materialize function's operands only
return updatedCall.clone(updatedCall.getType(), materializedOperands);
}
}

@Override
public RexNode visitInputRef(RexInputRef inputRef) {
RelDataType oldType = inputRef.getType();
if (isTimeIndicatorType(oldType)) {
RelDataType resolvedRefType = inputFieldTypes.get(inputRef.getIndex());
if (!isTimeIndicatorType(resolvedRefType)) {
// input has been materialized
return new RexInputRef(inputRef.getIndex(), resolvedRefType);
}
}
return super.visitInputRef(inputRef);
}

@Override
public RexNode visitPatternFieldRef(RexPatternFieldRef fieldRef) {
RelDataType oldType = fieldRef.getType();
if (isTimeIndicatorType(oldType)) {
RelDataType resolvedRefType = inputFieldTypes.get(fieldRef.getIndex());
if (!isTimeIndicatorType(resolvedRefType)) {
// input has been materialized
return new RexPatternFieldRef(
fieldRef.getAlpha(), fieldRef.getIndex(), resolvedRefType);
}
}
return super.visitPatternFieldRef(fieldRef);
}
}
}
Loading