From 7a0b4850acaa0ce7e761e61ed844a763a93e47a4 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Mon, 22 Jun 2026 16:11:00 +0200 Subject: [PATCH 1/5] [FLINK-39957][table] Support EXPLAIN of CREATE/ALTER MATERIALIZED TABLE in the SQL parser --- .../src/main/codegen/includes/parserImpls.ftl | 11 ++++- .../sql/parser/utils/ParserResource.java | 4 ++ .../sql/parser/FlinkSqlParserImplTest.java | 45 ++++++++++++------- 3 files changed, 42 insertions(+), 18 deletions(-) diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index aef621ddc7db2..2b8250ea98ed9 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -3138,16 +3138,25 @@ SqlNode SqlRichExplain() : stmt = RichSqlInsert() | stmt = SqlCreate() + | + stmt = SqlAlterMaterializedTable() ) { if ((stmt instanceof SqlCreate) && !(stmt instanceof SqlCreateTableAs) - && !(stmt instanceof SqlReplaceTableAs)) { + && !(stmt instanceof SqlReplaceTableAs) + && !(stmt instanceof SqlCreateOrAlterMaterializedTable)) { throw SqlUtil.newContextException( getPos(), ParserResource.RESOURCE.explainCreateOrReplaceStatementUnsupported()); } + if ((stmt instanceof SqlAlterMaterializedTable) && !(stmt instanceof SqlAlterMaterializedTableAsQuery)) { + throw SqlUtil.newContextException( + getPos(), + ParserResource.RESOURCE.explainAlterMaterializedTableUnsupported()); + } + return new SqlRichExplain(getPos(), stmt, explainDetails); } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java index d459ca28dc8fd..7dade5ead9611 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java @@ -53,6 +53,10 @@ public interface ParserResource { "Unsupported CREATE OR REPLACE statement for EXPLAIN. The statement must define a query using the AS clause (i.e. CTAS/RTAS statements).") Resources.ExInst explainCreateOrReplaceStatementUnsupported(); + @Resources.BaseMessage( + "Unsupported ALTER MATERIALIZED TABLE statement for EXPLAIN. The statement must define a query using the AS clause.") + Resources.ExInst explainAlterMaterializedTableUnsupported(); + @Resources.BaseMessage( "Columns identifiers without types in the schema are supported on CTAS/RTAS statements only.") Resources.ExInst columnsIdentifiersUnsupported(); diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index b5ea58c8ea2d2..bdd7ee919b039 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -3170,29 +3170,40 @@ void testAnalyzeTable() { .fails("(?s).*Encountered \"\\,\" at line 1, column 32.\n.*"); } - @Test - void testExplainCreateTableNoSupported() { - this.sql("EXPLAIN CREATE TABLE t (id int^)^") + @ParameterizedTest + @CsvSource({"CREATE", "CREATE OR REPLACE"}) + void testExplainCreateTableNotSupported(final String operation) { + this.sql(String.format("EXPLAIN %s TABLE t (id int^)^", operation)) .fails( "Unsupported CREATE OR REPLACE statement for EXPLAIN\\. The statement must define a query using the AS clause \\(i\\.e\\. CTAS/RTAS statements\\)\\."); } - @Test - void testExplainCreateTableAsSelect() { - this.sql("EXPLAIN CREATE TABLE t AS SELECT * FROM b") - .ok("EXPLAIN CREATE TABLE `T`\nAS\nSELECT *\nFROM `B`"); - } - - @Test - void testExplainCreateOrReplaceTableAsSelect() { - this.sql("EXPLAIN CREATE OR REPLACE TABLE t AS SELECT * FROM b") - .ok("EXPLAIN CREATE OR REPLACE TABLE `T`\nAS\nSELECT *\nFROM `B`"); + @ParameterizedTest + @CsvSource({ + "ALTER MATERIALIZED TABLE t ^SUSPEND^", + "ALTER MATERIALIZED TABLE t ^RESUME^", + "ALTER MATERIALIZED TABLE t ADD category_id STRING METADATA ^VIRTUAL^", + "ALTER MATERIALIZED TABLE t MODIFY measurement double ^METADATA^" + }) + void testExplainAlterMaterializedTableWithoutAsQueryNotSupported( + final String alterMaterializedTable) { + this.sql(String.format("EXPLAIN %s", alterMaterializedTable)) + .fails( + "Unsupported ALTER MATERIALIZED TABLE statement for EXPLAIN\\. The statement must define a query using the AS clause."); } - @Test - void testExplainReplaceTableAsSelect() { - this.sql("EXPLAIN REPLACE TABLE t AS SELECT * FROM b") - .ok("EXPLAIN REPLACE TABLE `T`\nAS\nSELECT *\nFROM `B`"); + @ParameterizedTest + @CsvSource({ + "CREATE", + "CREATE OR REPLACE", + "REPLACE", + "CREATE MATERIALIZED", + "ALTER MATERIALIZED", + "CREATE OR ALTER MATERIALIZED", + }) + void testExplain(final String operation) { + this.sql(String.format("EXPLAIN %s TABLE t AS SELECT * FROM b", operation)) + .ok(String.format("EXPLAIN %s TABLE `T`\nAS\nSELECT *\nFROM `B`", operation)); } @Test From cf2643ae31033d28a93d82f614ddd8279f74efa2 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Mon, 22 Jun 2026 16:11:00 +0200 Subject: [PATCH 2/5] [FLINK-39957][table] Make materialized table CREATE/ALTER operations translatable as ModifyOperation --- .../operations/ModifyOperationVisitor.java | 6 ++++ ...lterMaterializedTableAsQueryOperation.java | 6 ++-- ...AlterMaterializedTableChangeOperation.java | 33 +++++++++++++++++-- .../CreateMaterializedTableOperation.java | 32 ++++++++++++++---- .../FullAlterMaterializedTableOperation.java | 6 ++-- ...edTableAsQueryOperationValidationTest.java | 2 +- 6 files changed, 71 insertions(+), 14 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java index 006f042b89308..573d1619d2de8 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java @@ -19,6 +19,8 @@ package org.apache.flink.table.operations; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation; +import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation; /** * Class that implements visitor pattern. It allows type safe logic on top of tree of {@link @@ -39,4 +41,8 @@ public interface ModifyOperationVisitor { T visit(CreateTableASOperation ctas); T visit(ReplaceTableAsOperation rtas); + + T visit(CreateMaterializedTableOperation createMaterializedTableOperation); + + T visit(AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java index 264fc752bfa2b..9da8302ca6042 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java @@ -23,6 +23,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.operations.QueryOperation; import java.util.List; import java.util.function.Function; @@ -40,8 +41,9 @@ public class AlterMaterializedTableAsQueryOperation extends AlterMaterializedTab public AlterMaterializedTableAsQueryOperation( ObjectIdentifier tableIdentifier, Function> tableChangesForTable, - ResolvedCatalogMaterializedTable oldTable) { - super(tableIdentifier, tableChangesForTable, oldTable); + ResolvedCatalogMaterializedTable oldTable, + QueryOperation sinkModifyQuery) { + super(tableIdentifier, tableChangesForTable, oldTable, sinkModifyQuery); } @Override diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java index 684518e99b096..87d588a920da5 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java @@ -37,6 +37,9 @@ import org.apache.flink.table.catalog.TableChange.ModifyRefreshHandler; import org.apache.flink.table.catalog.TableChange.ModifyRefreshStatus; import org.apache.flink.table.catalog.TableChange.ModifyStartMode; +import org.apache.flink.table.operations.ModifyOperation; +import org.apache.flink.table.operations.ModifyOperationVisitor; +import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.operations.ddl.AlterTableChangeOperation; import java.util.ArrayList; @@ -50,9 +53,11 @@ * Alter materialized table with new table definition and table changes represents the modification. */ @Internal -public class AlterMaterializedTableChangeOperation extends AlterMaterializedTableOperation { +public class AlterMaterializedTableChangeOperation extends AlterMaterializedTableOperation + implements ModifyOperation { private final Function> tableChangeForTable; + private final QueryOperation sinkModifyQuery; private ResolvedCatalogMaterializedTable oldTable; private MaterializedTableChangeHandler handler; private CatalogMaterializedTable newTable; @@ -62,9 +67,23 @@ public AlterMaterializedTableChangeOperation( ObjectIdentifier tableIdentifier, Function> tableChangeForTable, ResolvedCatalogMaterializedTable oldTable) { + // Metadata-only changes (options, schema, distribution, ...) carry no sink-modifying query. + this(tableIdentifier, tableChangeForTable, oldTable, null); + } + + public AlterMaterializedTableChangeOperation( + ObjectIdentifier tableIdentifier, + Function> tableChangeForTable, + ResolvedCatalogMaterializedTable oldTable, + QueryOperation sinkModifyQuery) { super(tableIdentifier); this.tableChangeForTable = tableChangeForTable; this.oldTable = oldTable; + this.sinkModifyQuery = sinkModifyQuery; + } + + public QueryOperation getSinkModifyQuery() { + return sinkModifyQuery; } public List getTableChanges() { @@ -76,7 +95,7 @@ public List getTableChanges() { public AlterMaterializedTableChangeOperation copyAsTableChangeOperation() { return new AlterMaterializedTableChangeOperation( - tableIdentifier, tableChangeForTable, oldTable); + tableIdentifier, tableChangeForTable, oldTable, sinkModifyQuery); } public CatalogMaterializedTable getNewTable() { @@ -144,6 +163,16 @@ public String asSummaryString() { "%s %s\n%s", getOperationName(), tableIdentifier.asSummaryString(), changes); } + @Override + public QueryOperation getChild() { + return this.sinkModifyQuery; + } + + @Override + public T accept(final ModifyOperationVisitor visitor) { + return visitor.visit(this); + } + /** Hook for subclasses to provide a different new-table builder. */ protected CatalogMaterializedTable computeNewTable() { return MaterializedTableChangeHandler.buildNewMaterializedTable(getHandlerWithChanges()); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/CreateMaterializedTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/CreateMaterializedTableOperation.java index 9fb68dc71b00e..ff425fe53aa6c 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/CreateMaterializedTableOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/CreateMaterializedTableOperation.java @@ -23,26 +23,33 @@ import org.apache.flink.table.api.internal.TableResultInternal; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.operations.ModifyOperation; +import org.apache.flink.table.operations.ModifyOperationVisitor; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.OperationUtils; +import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.operations.ddl.CreateOperation; -import java.util.Collections; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; /** Operation to describe a CREATE MATERIALIZED TABLE statement. */ @Internal public class CreateMaterializedTableOperation - implements CreateOperation, MaterializedTableOperation { + implements CreateOperation, MaterializedTableOperation, ModifyOperation { private final ObjectIdentifier tableIdentifier; private final ResolvedCatalogMaterializedTable materializedTable; + private final QueryOperation sinkModifyingQuery; public CreateMaterializedTableOperation( - ObjectIdentifier tableIdentifier, ResolvedCatalogMaterializedTable materializedTable) { + ObjectIdentifier tableIdentifier, + ResolvedCatalogMaterializedTable materializedTable, + QueryOperation sinkModifyQuery) { this.tableIdentifier = tableIdentifier; this.materializedTable = materializedTable; + this.sinkModifyingQuery = sinkModifyQuery; } @Override @@ -60,6 +67,10 @@ public ResolvedCatalogMaterializedTable getCatalogMaterializedTable() { return materializedTable; } + public QueryOperation getSinkModifyingQuery() { + return sinkModifyingQuery; + } + @Override public String asSummaryString() { Map params = new LinkedHashMap<>(); @@ -67,9 +78,16 @@ public String asSummaryString() { params.put("identifier", tableIdentifier); return OperationUtils.formatWithChildren( - "CREATE MATERIALIZED TABLE", - params, - Collections.emptyList(), - Operation::asSummaryString); + "CREATE MATERIALIZED TABLE", params, List.of(), Operation::asSummaryString); + } + + @Override + public QueryOperation getChild() { + return this.sinkModifyingQuery; + } + + @Override + public T accept(final ModifyOperationVisitor visitor) { + return visitor.visit(this); } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/FullAlterMaterializedTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/FullAlterMaterializedTableOperation.java index ade473ddeacc2..b524e0c0cbf19 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/FullAlterMaterializedTableOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/FullAlterMaterializedTableOperation.java @@ -23,6 +23,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.operations.QueryOperation; import java.util.List; import java.util.function.Function; @@ -42,8 +43,9 @@ public FullAlterMaterializedTableOperation( final Function> tableChangeForTable, final ResolvedCatalogMaterializedTable oldTable, final Function - newTableBuilder) { - super(tableIdentifier, tableChangeForTable, oldTable); + newTableBuilder, + final QueryOperation sinkModifyQuery) { + super(tableIdentifier, tableChangeForTable, oldTable, sinkModifyQuery); this.newTableBuilder = newTableBuilder; } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperationValidationTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperationValidationTest.java index 8a513278bf7f1..88073daca8bd1 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperationValidationTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperationValidationTest.java @@ -194,6 +194,6 @@ void acceptQueryOnlyChange() { private static AlterMaterializedTableAsQueryOperation operation( ResolvedCatalogMaterializedTable oldTable, List changes) { return new AlterMaterializedTableAsQueryOperation( - ObjectIdentifier.of("cat", "db", "mt"), ignored -> changes, oldTable); + ObjectIdentifier.of("cat", "db", "mt"), ignored -> changes, oldTable, null); } } From fd4a3dc523d268146c45ef7bf7e304c4c69d36b6 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Mon, 22 Jun 2026 16:11:00 +0200 Subject: [PATCH 3/5] [FLINK-39957][table-planner] Translate materialized table CREATE/ALTER to a sink RelNode for EXPLAIN --- .../planner/connectors/DynamicSinkUtils.java | 30 ++++++ .../SqlNodeToOperationConversion.java | 8 +- .../converters/SqlNodeConvertUtils.java | 28 +++++ ...lterMaterializedTableAsQueryConverter.java | 5 +- ...eateOrAlterMaterializedTableConverter.java | 14 ++- .../planner/delegation/PlannerBase.scala | 49 ++++++++- ...izedTableNodeToOperationConverterTest.java | 42 ++++++++ .../resources/explain/testExplainAlterMT.out | 14 +++ .../resources/explain/testExplainCreateMT.out | 14 +++ .../explain/testExplainCreateOrAlterMT.out | 14 +++ .../explain/testExplainFullAlterMT.out | 12 +++ .../plan/stream/sql/TableSinkTest.scala | 100 ++++++++++++++++++ 12 files changed, 323 insertions(+), 7 deletions(-) create mode 100644 flink-table/flink-table-planner/src/test/resources/explain/testExplainAlterMT.out create mode 100644 flink-table/flink-table-planner/src/test/resources/explain/testExplainCreateMT.out create mode 100644 flink-table/flink-table-planner/src/test/resources/explain/testExplainCreateOrAlterMT.out create mode 100644 flink-table/flink-table-planner/src/test/resources/explain/testExplainFullAlterMT.out diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java index 860b6131da0b7..e3a0a6b73e782 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java @@ -258,6 +258,36 @@ public static RelNode convertCreateTableAsToRel( null); // conflictStrategy } + /** + * Converts the {@code AS} query of a materialized table (CREATE / CREATE OR ALTER / ALTER ... + * AS) into a {@link RelNode} that writes into the table, adding helper projections if + * necessary. The caller resolves the target table to its {@link ResolvedCatalogTable} form (the + * new definition for an alter, the created definition for a create). + */ + public static RelNode convertMaterializedTableAsToRel( + FlinkRelBuilder relBuilder, + RelNode input, + Catalog catalog, + ObjectIdentifier identifier, + ResolvedCatalogTable resolvedTable, + Map staticPartitions, + boolean isOverwrite, + DynamicTableSink sink) { + final ContextResolvedTable contextResolvedTable = + ContextResolvedTable.permanent(identifier, catalog, resolvedTable); + + return convertSinkToRel( + relBuilder, + input, + Collections.emptyMap(), + contextResolvedTable, + staticPartitions, + null, + isOverwrite, + sink, + null); // conflictStrategy + } + private static RelNode convertSinkToRel( FlinkRelBuilder relBuilder, RelNode input, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java index eb29049dfb209..074309ab9f4b7 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java @@ -32,6 +32,8 @@ import org.apache.flink.sql.parser.ddl.SqlUseModules; import org.apache.flink.sql.parser.ddl.catalog.SqlDropCatalog; import org.apache.flink.sql.parser.ddl.catalog.SqlUseCatalog; +import org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableAsQuery; +import org.apache.flink.sql.parser.ddl.materializedtable.SqlCreateMaterializedTable; import org.apache.flink.sql.parser.ddl.table.SqlCreateTableAs; import org.apache.flink.sql.parser.ddl.table.SqlDropTable; import org.apache.flink.sql.parser.ddl.view.SqlDropView; @@ -584,8 +586,10 @@ private Operation convertRichExplain(SqlRichExplain sqlExplain) { } } else if (sqlNode.getKind().belongsTo(SqlKind.QUERY)) { operation = convertSqlQuery(sqlExplain.getStatement()); - } else if ((sqlNode instanceof SqlCreateTableAs) - || (sqlNode instanceof SqlReplaceTableAs)) { + } else if (sqlNode instanceof SqlCreateTableAs + || sqlNode instanceof SqlReplaceTableAs + || sqlNode instanceof SqlCreateMaterializedTable + || sqlNode instanceof SqlAlterMaterializedTableAsQuery) { operation = convert(flinkPlanner, catalogManager, sqlNode) .orElseThrow( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java index f2a6e2595a1a0..1b5d46e8a3064 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java @@ -21,6 +21,7 @@ import org.apache.flink.sql.parser.ddl.view.SqlAlterView; import org.apache.flink.sql.parser.error.SqlValidateException; import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogBaseTable.TableKind; @@ -30,8 +31,11 @@ import org.apache.flink.table.catalog.ResolvedCatalogView; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.utils.ValidationUtils; +import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; import org.apache.flink.table.planner.operations.PlannerQueryOperation; +import org.apache.flink.table.planner.operations.SqlNodeToOperationConversion; import org.apache.flink.table.planner.operations.converters.SqlNodeConverter.ConvertContext; import org.apache.flink.shaded.guava33.com.google.common.base.Splitter; @@ -108,6 +112,30 @@ static PlannerQueryOperation toQueryOperation(SqlNode validated, ConvertContext relational.project(), () -> context.toQuotedSqlString(validated)); } + /** + * Validates the given materialized table {@code AS} query and converts it into a {@link + * PlannerQueryOperation}. Shared by the CREATE / CREATE OR ALTER / ALTER ... AS converters. + */ + public static PlannerQueryOperation validateAndConvertAsQuery( + SqlNode asQuery, ConvertContext context) { + final FlinkPlannerImpl flinkPlanner = context.getFlinkPlanner(); + final SqlNode validated = flinkPlanner.validate(asQuery); + final Operation operation = + SqlNodeToOperationConversion.convert( + flinkPlanner, context.getCatalogManager(), validated) + .orElseThrow( + () -> + new TableException( + "Unsupported materialized table definition query: " + + validated.getClass().getSimpleName())); + if (!(operation instanceof PlannerQueryOperation)) { + throw new TableException( + "Expected a query for the materialized table definition but got: " + + operation.getClass().getSimpleName()); + } + return (PlannerQueryOperation) operation; + } + /** convert the query part of a VIEW statement into a {@link CatalogView}. */ static CatalogView toCatalogView( SqlNode query, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAsQueryConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAsQueryConverter.java index 3860a3f93c967..de32488a4948f 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAsQueryConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAsQueryConverter.java @@ -46,8 +46,11 @@ protected Operation convertToOperation( ResolvedCatalogMaterializedTable oldTable, ConvertContext context) { final ObjectIdentifier identifier = resolveIdentifier(sqlAlterTableAsQuery, context); + final PlannerQueryOperation query = + SqlNodeConvertUtils.validateAndConvertAsQuery( + sqlAlterTableAsQuery.getAsQuery(), context); return new AlterMaterializedTableAsQueryOperation( - identifier, gatherTableChanges(sqlAlterTableAsQuery, context), oldTable); + identifier, gatherTableChanges(sqlAlterTableAsQuery, context), oldTable, query); } @Override diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java index eff024de3d154..e46b7d614bdd1 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java @@ -45,7 +45,9 @@ import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation; import org.apache.flink.table.operations.materializedtable.FullAlterMaterializedTableOperation; import org.apache.flink.table.operations.materializedtable.MaterializedTableChangeHandler; +import org.apache.flink.table.planner.operations.PlannerQueryOperation; import org.apache.flink.table.planner.operations.converters.MergeTableAsUtil; +import org.apache.flink.table.planner.operations.converters.SqlNodeConvertUtils; import org.apache.flink.table.planner.utils.MaterializedTableUtils; import org.apache.calcite.sql.SqlNode; @@ -121,11 +123,15 @@ private Operation handleAlter( final ObjectIdentifier identifier) { final SchemaResolver schemaResolver = context.getCatalogManager().getSchemaResolver(); final MergeContext mergeContext = getMergeContext(sqlCreateOrAlterTable, context); + final PlannerQueryOperation query = + SqlNodeConvertUtils.validateAndConvertAsQuery( + sqlCreateOrAlterTable.getAsQuery(), context); return new FullAlterMaterializedTableOperation( identifier, currentTable -> buildTableChanges(currentTable, mergeContext, schemaResolver), oldTable, - currentTable -> buildNewTable(currentTable, mergeContext, schemaResolver)); + currentTable -> buildNewTable(currentTable, mergeContext, schemaResolver), + query); } private Operation handleConvert( @@ -237,8 +243,10 @@ private Operation handleCreate( final ObjectIdentifier identifier) { final ResolvedCatalogMaterializedTable resolvedTable = getResolvedCatalogMaterializedTable(sqlCreateOrAlterTable, context); - - return new CreateMaterializedTableOperation(identifier, resolvedTable); + final PlannerQueryOperation query = + SqlNodeConvertUtils.validateAndConvertAsQuery( + sqlCreateOrAlterTable.getAsQuery(), context); + return new CreateMaterializedTableOperation(identifier, resolvedTable, query); } private List buildTableChanges( diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala index fe17d1f29d822..e05b108b4a6a7 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala @@ -35,6 +35,7 @@ import org.apache.flink.table.module.{Module, ModuleManager} import org.apache.flink.table.operations._ import org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode import org.apache.flink.table.operations.ddl.CreateTableOperation +import org.apache.flink.table.operations.materializedtable.{AlterMaterializedTableChangeOperation, CreateMaterializedTableOperation} import org.apache.flink.table.planner.JMap import org.apache.flink.table.planner.calcite._ import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema @@ -286,7 +287,31 @@ abstract class PlannerBase( rtasOperation.getCreateTableOperation, rtasOperation.getChild, Collections.emptyMap(), - false) + isOverwrite = false) + + case createMtOperation: CreateMaterializedTableOperation => + convertMaterializedTableToRel( + createMtOperation.getTableIdentifier, + createMtOperation.getCatalogMaterializedTable.toResolvedCatalogTable, + createMtOperation.getChild, + Collections.emptyMap() + ) + + case alterMtOperation: AlterMaterializedTableChangeOperation + if alterMtOperation.getSinkModifyQuery != null => + val newTable = + catalogManager.resolveCatalogMaterializedTable(alterMtOperation.getNewTable) + // An alter writes to the existing backing storage, so carry over the old table's resolved + // connector options (e.g. the managed `path` the catalog injects at create time) that the + // rebuilt new table does not retain. New options win on conflict. + val sinkOptions = + new util.HashMap[String, String](alterMtOperation.getOldTable.getOptions) + sinkOptions.putAll(newTable.getOptions) + convertMaterializedTableToRel( + alterMtOperation.getTableIdentifier, + newTable.toResolvedCatalogTable.copy(sinkOptions), + alterMtOperation.getChild, + Collections.emptyMap()) case stagedSink: StagedSinkModifyOperation => val input = createRelBuilder.queryOperation(modifyOperation.getChild).build() @@ -558,6 +583,28 @@ abstract class PlannerBase( tableSink); } + private def convertMaterializedTableToRel( + identifier: ObjectIdentifier, + resolvedTable: ResolvedCatalogTable, + queryOperation: QueryOperation, + staticPartitions: JMap[String, String]): RelNode = { + val input = createRelBuilder.queryOperation(queryOperation).build() + val catalog = toScala(catalogManager.getCatalog(identifier.getCatalogName)) + + val tableSink = + createDynamicTableSink(identifier, catalog, resolvedTable, isTemporary = false) + + DynamicSinkUtils.convertMaterializedTableAsToRel( + createRelBuilder, + input, + catalog.orNull, + identifier, + resolvedTable, + staticPartitions, + false, + tableSink) + } + protected def createSerdeContext: SerdeContext = { val planner = createFlinkPlanner new SerdeContext( diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java index 6e99aa29ed35a..1d2cd769da81b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java @@ -43,6 +43,7 @@ import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.operations.ExplainOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation; import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation; @@ -51,6 +52,7 @@ import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableSuspendOperation; import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation; import org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation; +import org.apache.flink.table.operations.materializedtable.FullAlterMaterializedTableOperation; import org.apache.flink.table.planner.utils.TableFunc0; import org.junit.jupiter.api.BeforeEach; @@ -757,6 +759,46 @@ void testCreateOrAlterMaterializedTable() { assertThat(materializedTable.getOrigin()).isEqualTo(expected); } + @Test + void testExplainCreateOrAlterMaterializedTable() { + final Operation operation = + parse("EXPLAIN CREATE OR ALTER MATERIALIZED TABLE myTable123 AS SELECT 123"); + + assertThat(operation).isInstanceOf(ExplainOperation.class); + assertThat(((ExplainOperation) operation).getChild()) + .isInstanceOf(CreateMaterializedTableOperation.class); + } + + @Test + void testExplainCreateMaterializedTable() { + final Operation operation = + parse("EXPLAIN CREATE MATERIALIZED TABLE myTable123 AS SELECT 123"); + + assertThat(operation).isInstanceOf(ExplainOperation.class); + assertThat(((ExplainOperation) operation).getChild()) + .isInstanceOf(CreateMaterializedTableOperation.class); + } + + @Test + void testExplainFullAlterMaterializedTable() { + // base_mtbl exists so the ALTER path is taken + final Operation operation = + parse("EXPLAIN CREATE OR ALTER MATERIALIZED TABLE base_mtbl AS SELECT 123"); + assertThat(operation).isInstanceOf(ExplainOperation.class); + assertThat(((ExplainOperation) operation).getChild()) + .isInstanceOf(FullAlterMaterializedTableOperation.class); + } + + @Test + void testExplainAlterMaterializedTable() { + final Operation operation = + parse("EXPLAIN ALTER MATERIALIZED TABLE base_mtbl AS SELECT 123"); + + assertThat(operation).isInstanceOf(ExplainOperation.class); + assertThat(((ExplainOperation) operation).getChild()) + .isInstanceOf(AlterMaterializedTableChangeOperation.class); + } + private static Collection testDataForCreateAlterMaterializedTableFailedCase() { final Collection list = new ArrayList<>(); list.addAll(createWithInvalidSchema()); diff --git a/flink-table/flink-table-planner/src/test/resources/explain/testExplainAlterMT.out b/flink-table/flink-table-planner/src/test/resources/explain/testExplainAlterMT.out new file mode 100644 index 0000000000000..65d24d635bd79 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/explain/testExplainAlterMT.out @@ -0,0 +1,14 @@ +== Abstract Syntax Tree == +LogicalSink(table=[default_catalog.default_database.MyMTTable], fields=[a, b]) ++- LogicalProject(a=[$0], b=[$1]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +== Optimized Physical Plan == +Sink(table=[default_catalog.default_database.MyMTTable], fields=[a, b]) ++- Calc(select=[a, b]) + +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c]) + +== Optimized Execution Plan == +Sink(table=[default_catalog.default_database.MyMTTable], fields=[a, b]) ++- Calc(select=[a, b]) + +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c]) diff --git a/flink-table/flink-table-planner/src/test/resources/explain/testExplainCreateMT.out b/flink-table/flink-table-planner/src/test/resources/explain/testExplainCreateMT.out new file mode 100644 index 0000000000000..65d24d635bd79 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/explain/testExplainCreateMT.out @@ -0,0 +1,14 @@ +== Abstract Syntax Tree == +LogicalSink(table=[default_catalog.default_database.MyMTTable], fields=[a, b]) ++- LogicalProject(a=[$0], b=[$1]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +== Optimized Physical Plan == +Sink(table=[default_catalog.default_database.MyMTTable], fields=[a, b]) ++- Calc(select=[a, b]) + +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c]) + +== Optimized Execution Plan == +Sink(table=[default_catalog.default_database.MyMTTable], fields=[a, b]) ++- Calc(select=[a, b]) + +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c]) diff --git a/flink-table/flink-table-planner/src/test/resources/explain/testExplainCreateOrAlterMT.out b/flink-table/flink-table-planner/src/test/resources/explain/testExplainCreateOrAlterMT.out new file mode 100644 index 0000000000000..65d24d635bd79 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/explain/testExplainCreateOrAlterMT.out @@ -0,0 +1,14 @@ +== Abstract Syntax Tree == +LogicalSink(table=[default_catalog.default_database.MyMTTable], fields=[a, b]) ++- LogicalProject(a=[$0], b=[$1]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +== Optimized Physical Plan == +Sink(table=[default_catalog.default_database.MyMTTable], fields=[a, b]) ++- Calc(select=[a, b]) + +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c]) + +== Optimized Execution Plan == +Sink(table=[default_catalog.default_database.MyMTTable], fields=[a, b]) ++- Calc(select=[a, b]) + +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c]) diff --git a/flink-table/flink-table-planner/src/test/resources/explain/testExplainFullAlterMT.out b/flink-table/flink-table-planner/src/test/resources/explain/testExplainFullAlterMT.out new file mode 100644 index 0000000000000..99d05dd70186e --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/explain/testExplainFullAlterMT.out @@ -0,0 +1,12 @@ +== Abstract Syntax Tree == +LogicalSink(table=[default_catalog.default_database.MyMTTable], fields=[a, b, c]) ++- LogicalProject(a=[$0], b=[$1], c=[$2]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +== Optimized Physical Plan == +Sink(table=[default_catalog.default_database.MyMTTable], fields=[a, b, c]) ++- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c]) + +== Optimized Execution Plan == +Sink(table=[default_catalog.default_database.MyMTTable], fields=[a, b, c]) ++- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c]) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala index ef9776caedc9b..b31b919907727 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala @@ -929,6 +929,106 @@ class TableSinkTest extends TableTestBase { assertEquals(TableTestUtil.replaceStageId(expected), TableTestUtil.replaceStageId(actual)) } + @Test + def testExplainCreateMaterializedTable(): Unit = { + val actual = util.tableEnv.explainSql(""" + |CREATE MATERIALIZED TABLE MyMTTable + | WITH ( + | 'connector' = 'values' + |) AS + | SELECT + | `a`, + | `b` + | FROM + | MyTable + |""".stripMargin) + + val expected = TableTestUtil.readFromResource("/explain/testExplainCreateMT.out") + + assertEquals(TableTestUtil.replaceStageId(expected), TableTestUtil.replaceStageId(actual)) + } + + @Test + def testExplainCreateOrAlterMaterializedTable(): Unit = { + val actual = util.tableEnv.explainSql(""" + |CREATE OR ALTER MATERIALIZED TABLE MyMTTable + | WITH ( + | 'connector' = 'values' + |) AS + | SELECT + | `a`, + | `b` + | FROM + | MyTable + |""".stripMargin) + + val expected = TableTestUtil.readFromResource("/explain/testExplainCreateOrAlterMT.out") + + assertEquals(TableTestUtil.replaceStageId(expected), TableTestUtil.replaceStageId(actual)) + } + + @Test + def testExplainAlterMaterializedTable(): Unit = { + util.addTable(s""" + |CREATE OR ALTER MATERIALIZED TABLE MyMTTable + | WITH ( + | 'connector' = 'values' + |) AS + | SELECT + | `a`, + | `b` + | FROM + | MyTable + |""".stripMargin) + val actual = util.tableEnv.explainSql(""" + |ALTER MATERIALIZED TABLE MyMTTable + |AS + | SELECT + | `a`, + | `b` + | FROM + | MyTable + |""".stripMargin) + + val expected = TableTestUtil.readFromResource("/explain/testExplainAlterMT.out") + + assertEquals(TableTestUtil.replaceStageId(expected), TableTestUtil.replaceStageId(actual)) + + } + + @Test + def testExplainFullAlterMaterializedTable(): Unit = { + util.addTable(s""" + |CREATE OR ALTER MATERIALIZED TABLE MyMTTable + | WITH ( + | 'connector' = 'values' + |) AS + | SELECT + | `a`, + | `b` + | FROM + | MyTable + |""".stripMargin) + val actual = util.tableEnv.explainSql(""" + |CREATE OR ALTER MATERIALIZED TABLE MyMTTable + | WITH ( + | 'connector' = 'values' + |) + |AS + | SELECT + | `a`, + | `b`, + | `c` + | FROM + | MyTable + |""".stripMargin) + + val expected = TableTestUtil.readFromResource("/explain/testExplainFullAlterMT.out") + + assertEquals(TableTestUtil.replaceStageId(expected), TableTestUtil.replaceStageId(actual)) + + } + @Test def testInsertOnConflictDoDeduplicate(): Unit = { util.addTable(s""" From 34091c41ce84cd0099d3b19975aa5ff5996dea25 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Mon, 22 Jun 2026 16:11:00 +0200 Subject: [PATCH 4/5] [FLINK-39957][sql-gateway] Route materialized table operations to the manager and add EXPLAIN ITs --- .../MaterializedTableManager.java | 13 ++- .../service/operation/OperationExecutor.java | 5 +- .../MaterializedTableStatementITCase.java | 83 +++++++++++++++++++ ...stExplainAlterMaterializedTableAsQuery.out | 17 ++++ ...rAlterMaterializedTableOnExistingTable.out | 17 ++++ 5 files changed, 130 insertions(+), 5 deletions(-) create mode 100644 flink-table/flink-sql-gateway/src/test/resources/explain/testExplainAlterMaterializedTableAsQuery.out create mode 100644 flink-table/flink-sql-gateway/src/test/resources/explain/testExplainCreateOrAlterMaterializedTableOnExistingTable.out diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java index 02c646440b1ae..56cf110880ac2 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java @@ -954,7 +954,8 @@ private ResultFetcher callAlterMaterializedTableChangeOperation( new AlterMaterializedTableChangeOperation( op.getTableIdentifier(), oldTable -> op.getTableChanges(), - suspendMaterializedTable); + suspendMaterializedTable, + op.getSinkModifyQuery()); operationExecutor.callExecutableOperation( handle, alterMaterializedTableChangeOperation); @@ -1010,7 +1011,10 @@ private ResultFetcher callAlterMaterializedTableChangeOperation( AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation = new AlterMaterializedTableChangeOperation( - tableIdentifier, oldTable -> tableChanges, oldMaterializedTable); + tableIdentifier, + oldTable -> tableChanges, + oldMaterializedTable, + op.getSinkModifyQuery()); operationExecutor.callExecutableOperation( handle, alterMaterializedTableChangeOperation); @@ -1029,7 +1033,10 @@ private AlterMaterializedTableChangeOperation generateRollbackAlterMaterializedT AlterMaterializedTableChangeOperation op) { return new AlterMaterializedTableChangeOperation( - op.getTableIdentifier(), oldTable -> List.of(), oldMaterializedTable); + op.getTableIdentifier(), + oldTable -> List.of(), + oldMaterializedTable, + op.getSinkModifyQuery()); } private TableChange.ModifyRefreshHandler generateResetSavepointTableChange( diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java index 5735abd6dee31..2bfd11eadecd4 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java @@ -494,7 +494,8 @@ private ResultFetcher executeOperationInStatementSetState( TableEnvironmentInternal tableEnv, OperationHandle handle, Operation operation) { if (operation instanceof EndStatementSetOperation) { return callEndStatementSetOperation(tableEnv, handle); - } else if (operation instanceof ModifyOperation) { + } else if (operation instanceof ModifyOperation + && !(operation instanceof MaterializedTableOperation)) { sessionContext.addStatementSetOperation((ModifyOperation) operation); return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false); } else { @@ -518,7 +519,7 @@ private ResultFetcher executeOperation( } else if (op instanceof EndStatementSetOperation) { throw new SqlExecutionException( "No Statement Set to submit. 'END' statement should be used after 'BEGIN STATEMENT SET'."); - } else if (op instanceof ModifyOperation) { + } else if (op instanceof ModifyOperation && !(op instanceof MaterializedTableOperation)) { return callModifyOperations( tableEnv, handle, Collections.singletonList((ModifyOperation) op)); } else if (op instanceof CompileAndExecutePlanOperation diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java index 134015c565da5..e124cca65a000 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java @@ -55,6 +55,7 @@ import org.apache.flink.table.gateway.workflow.WorkflowInfo; import org.apache.flink.table.gateway.workflow.scheduler.EmbeddedQuartzScheduler; import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.planner.utils.TableTestUtil; import org.apache.flink.table.refresh.ContinuousRefreshHandler; import org.apache.flink.table.refresh.ContinuousRefreshHandlerSerializer; import org.apache.flink.types.Row; @@ -1858,6 +1859,88 @@ void testRefreshMaterializedTableWithInvalidParameterInContinuousMode() throws E dropMaterializedTable(getObjectIdentifier("my_materialized_table")); } + @Test + void testExplainCreateOrAlterMaterializedTableOnExistingTable() throws Exception { + createAndVerifyCreateMaterializedTableWithData( + "users_shops", List.of(), Map.of(), RefreshMode.FULL); + ObjectIdentifier userShopsIdentifier = getObjectIdentifier("users_shops"); + ResolvedCatalogMaterializedTable oldTable = getTable(userShopsIdentifier); + + // CREATE OR ALTER on an existing table behaves like an alter-with-query. Re-state the same + // definition so EXPLAIN plans the sink over the (unchanged) table. + String explainDDL = + "EXPLAIN CREATE OR ALTER MATERIALIZED TABLE users_shops" + + " PARTITIONED BY (ds)" + + " WITH ('format' = 'debezium-json')" + + " FRESHNESS = INTERVAL '30' SECOND" + + " REFRESH_MODE = FULL" + + " AS SELECT" + + " user_id," + + " shop_id," + + " ds," + + " COUNT(order_id) AS order_cnt" + + " FROM (" + + " SELECT user_id, shop_id, order_created_at AS ds, order_id FROM my_source" + + " ) AS tmp" + + " GROUP BY (user_id, shop_id, ds)"; + + verifyExplainPlan( + explainDDL, + "/explain/testExplainCreateOrAlterMaterializedTableOnExistingTable.out"); + + // EXPLAIN is side-effect-free: the existing materialized table is unchanged. + ResolvedCatalogMaterializedTable newTable = getTable(userShopsIdentifier); + assertThat(newTable.getResolvedSchema()).isEqualTo(oldTable.getResolvedSchema()); + assertThat(newTable.getExpandedQuery()).isEqualTo(oldTable.getExpandedQuery()); + + dropMaterializedTable(userShopsIdentifier); + } + + @Test + void testExplainAlterMaterializedTableAsQuery() throws Exception { + createAndVerifyCreateMaterializedTableWithData( + "users_shops", List.of(), Map.of(), RefreshMode.FULL); + ObjectIdentifier userShopsIdentifier = getObjectIdentifier("users_shops"); + ResolvedCatalogMaterializedTable oldTable = getTable(userShopsIdentifier); + + String explainDDL = + "EXPLAIN ALTER MATERIALIZED TABLE users_shops" + + " AS SELECT" + + " user_id," + + " shop_id," + + " ds," + + " COUNT(order_id) AS order_cnt" + + " FROM (" + + " SELECT user_id, shop_id, order_created_at AS ds, order_id FROM my_source" + + " ) AS tmp" + + " GROUP BY (user_id, shop_id, ds)"; + + verifyExplainPlan(explainDDL, "/explain/testExplainAlterMaterializedTableAsQuery.out"); + + // EXPLAIN is side-effect-free: the materialized table definition is unchanged. + ResolvedCatalogMaterializedTable newTable = getTable(userShopsIdentifier); + assertThat(newTable.getResolvedSchema()).isEqualTo(oldTable.getResolvedSchema()); + assertThat(newTable.getExpandedQuery()).isEqualTo(oldTable.getExpandedQuery()); + + dropMaterializedTable(userShopsIdentifier); + } + + private void verifyExplainPlan(String explainStatement, String expectedResource) + throws Exception { + OperationHandle handle = executeStatement(explainStatement); + awaitOperationTermination(service, sessionHandle, handle); + List results = fetchAllResults(service, sessionHandle, handle); + assertThat(results).hasSize(1); + // The catalog name is randomized per test, so normalize it before comparing. + String actual = + TableTestUtil.replaceStageId( + results.get(0) + .getString(0) + .toString() + .replace(fileSystemCatalogName, "$CATALOG")); + assertThat(actual).isEqualTo(TableTestUtil.readFromResource(expectedResource)); + } + private void setupSavepointDir(Path temporaryPath) throws Exception { String savepointDir = "file://" + temporaryPath.toAbsolutePath(); String setupSavepointDDL = diff --git a/flink-table/flink-sql-gateway/src/test/resources/explain/testExplainAlterMaterializedTableAsQuery.out b/flink-table/flink-sql-gateway/src/test/resources/explain/testExplainAlterMaterializedTableAsQuery.out new file mode 100644 index 0000000000000..ec10724eeb652 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/test/resources/explain/testExplainAlterMaterializedTableAsQuery.out @@ -0,0 +1,17 @@ +== Abstract Syntax Tree == +LogicalSink(table=[$CATALOG.test_db.users_shops], fields=[user_id, shop_id, ds, order_cnt]) ++- LogicalAggregate(group=[{0, 1, 2}], order_cnt=[COUNT($3)]) + +- LogicalProject(user_id=[$1], shop_id=[$2], ds=[$3], order_id=[$0]) + +- LogicalTableScan(table=[[$CATALOG, test_db, my_source]]) + +== Optimized Physical Plan == +Sink(table=[$CATALOG.test_db.users_shops], fields=[user_id, shop_id, order_created_at, order_cnt]) ++- GroupAggregate(groupBy=[user_id, shop_id, order_created_at], select=[user_id, shop_id, order_created_at, COUNT(order_id) AS order_cnt]) + +- Exchange(distribution=[hash[user_id, shop_id, order_created_at]]) + +- TableSourceScan(table=[[$CATALOG, test_db, my_source]], fields=[order_id, user_id, shop_id, order_created_at]) + +== Optimized Execution Plan == +Sink(table=[$CATALOG.test_db.users_shops], fields=[user_id, shop_id, order_created_at, order_cnt]) ++- GroupAggregate(groupBy=[user_id, shop_id, order_created_at], select=[user_id, shop_id, order_created_at, COUNT(order_id) AS order_cnt]) + +- Exchange(distribution=[hash[user_id, shop_id, order_created_at]]) + +- TableSourceScan(table=[[$CATALOG, test_db, my_source]], fields=[order_id, user_id, shop_id, order_created_at]) diff --git a/flink-table/flink-sql-gateway/src/test/resources/explain/testExplainCreateOrAlterMaterializedTableOnExistingTable.out b/flink-table/flink-sql-gateway/src/test/resources/explain/testExplainCreateOrAlterMaterializedTableOnExistingTable.out new file mode 100644 index 0000000000000..ec10724eeb652 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/test/resources/explain/testExplainCreateOrAlterMaterializedTableOnExistingTable.out @@ -0,0 +1,17 @@ +== Abstract Syntax Tree == +LogicalSink(table=[$CATALOG.test_db.users_shops], fields=[user_id, shop_id, ds, order_cnt]) ++- LogicalAggregate(group=[{0, 1, 2}], order_cnt=[COUNT($3)]) + +- LogicalProject(user_id=[$1], shop_id=[$2], ds=[$3], order_id=[$0]) + +- LogicalTableScan(table=[[$CATALOG, test_db, my_source]]) + +== Optimized Physical Plan == +Sink(table=[$CATALOG.test_db.users_shops], fields=[user_id, shop_id, order_created_at, order_cnt]) ++- GroupAggregate(groupBy=[user_id, shop_id, order_created_at], select=[user_id, shop_id, order_created_at, COUNT(order_id) AS order_cnt]) + +- Exchange(distribution=[hash[user_id, shop_id, order_created_at]]) + +- TableSourceScan(table=[[$CATALOG, test_db, my_source]], fields=[order_id, user_id, shop_id, order_created_at]) + +== Optimized Execution Plan == +Sink(table=[$CATALOG.test_db.users_shops], fields=[user_id, shop_id, order_created_at, order_cnt]) ++- GroupAggregate(groupBy=[user_id, shop_id, order_created_at], select=[user_id, shop_id, order_created_at, COUNT(order_id) AS order_cnt]) + +- Exchange(distribution=[hash[user_id, shop_id, order_created_at]]) + +- TableSourceScan(table=[[$CATALOG, test_db, my_source]], fields=[order_id, user_id, shop_id, order_created_at]) From a128d529ba738a14f6b32cd1bb8402c30ded5bae Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Mon, 22 Jun 2026 18:21:36 +0200 Subject: [PATCH 5/5] [FLINK-39957][table-planner] Support EXPLAIN of table-to-materialized-table conversion --- .../MaterializedTableConversionITCase.java | 35 +++++++++++++++++++ .../MaterializedTableStatementITCase.java | 15 ++------ .../service/MaterializedTableTestUtils.java | 29 +++++++++++++++ ...ExplainConvertTableToMaterializedTable.out | 14 ++++++++ .../operations/ModifyOperationVisitor.java | 3 ++ ...vertTableToMaterializedTableOperation.java | 24 +++++++++++-- ...eateOrAlterMaterializedTableConverter.java | 7 +++- .../planner/delegation/PlannerBase.scala | 20 ++++++++--- 8 files changed, 127 insertions(+), 20 deletions(-) create mode 100644 flink-table/flink-sql-gateway/src/test/resources/explain/testExplainConvertTableToMaterializedTable.out diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableConversionITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableConversionITCase.java index 870648e9e9694..ec46b86960109 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableConversionITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableConversionITCase.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode; import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshStatus; import org.apache.flink.table.catalog.ObjectIdentifier; @@ -62,6 +63,7 @@ import static org.apache.flink.table.gateway.service.MaterializedTableTestUtils.executeStatement; import static org.apache.flink.table.gateway.service.MaterializedTableTestUtils.getContinuousRefreshHandler; import static org.apache.flink.table.gateway.service.MaterializedTableTestUtils.getTable; +import static org.apache.flink.table.gateway.service.MaterializedTableTestUtils.verifyExplainPlan; import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.awaitOperationTermination; import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.fetchAllResults; import static org.apache.flink.test.util.TestUtils.waitUntilAllTasksAreRunning; @@ -293,6 +295,39 @@ void testConvertTableLeftSuspendedWhenRefreshJobFailsToStart() throws Exception assertThat(suspendedMaterializedTable.getRefreshStatus()).isSameAs(RefreshStatus.SUSPENDED); } + @Test + void testExplainConvertTableToMaterializedTable() throws Exception { + // pre-create a regular table that CREATE OR ALTER would convert in place. + String createRegularTableDDL = + "CREATE TABLE users_shops (\n" + + " user_id BIGINT,\n" + + " shop_id BIGINT,\n" + + " payment_amount_cents BIGINT\n" + + ")\n" + + "WITH ('connector' = 'values')"; + OperationHandle createTableHandle = + executeStatement(service, sessionHandle, createRegularTableDDL); + awaitOperationTermination(service, sessionHandle, createTableHandle); + + String explainDDL = + "EXPLAIN CREATE OR ALTER MATERIALIZED TABLE users_shops" + + " FRESHNESS = INTERVAL '30' SECOND" + + " AS SELECT user_id, shop_id, payment_amount_cents FROM datagenSource"; + + verifyExplainPlan( + service, + sessionHandle, + fileSystemCatalogName, + explainDDL, + "/explain/testExplainConvertTableToMaterializedTable.out"); + + // EXPLAIN is side-effect-free: the entry is still a regular table, not converted. + assertThat( + service.getTable(sessionHandle, getObjectIdentifier("users_shops")) + .getTableKind()) + .isSameAs(CatalogBaseTable.TableKind.TABLE); + } + private SessionHandle initializeSession() { SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); String catalogDDL = diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java index e124cca65a000..d7b81139d3147 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java @@ -55,7 +55,6 @@ import org.apache.flink.table.gateway.workflow.WorkflowInfo; import org.apache.flink.table.gateway.workflow.scheduler.EmbeddedQuartzScheduler; import org.apache.flink.table.planner.factories.TestValuesTableFactory; -import org.apache.flink.table.planner.utils.TableTestUtil; import org.apache.flink.table.refresh.ContinuousRefreshHandler; import org.apache.flink.table.refresh.ContinuousRefreshHandlerSerializer; import org.apache.flink.types.Row; @@ -1927,18 +1926,8 @@ void testExplainAlterMaterializedTableAsQuery() throws Exception { private void verifyExplainPlan(String explainStatement, String expectedResource) throws Exception { - OperationHandle handle = executeStatement(explainStatement); - awaitOperationTermination(service, sessionHandle, handle); - List results = fetchAllResults(service, sessionHandle, handle); - assertThat(results).hasSize(1); - // The catalog name is randomized per test, so normalize it before comparing. - String actual = - TableTestUtil.replaceStageId( - results.get(0) - .getString(0) - .toString() - .replace(fileSystemCatalogName, "$CATALOG")); - assertThat(actual).isEqualTo(TableTestUtil.readFromResource(expectedResource)); + MaterializedTableTestUtils.verifyExplainPlan( + service, sessionHandle, fileSystemCatalogName, explainStatement, expectedResource); } private void setupSavepointDir(Path temporaryPath) throws Exception { diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableTestUtils.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableTestUtils.java index 66af0bd62029e..f840b8596250a 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableTestUtils.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableTestUtils.java @@ -21,13 +21,20 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.gateway.api.SqlGatewayService; import org.apache.flink.table.gateway.api.operation.OperationHandle; import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.planner.utils.TableTestUtil; import org.apache.flink.table.refresh.ContinuousRefreshHandler; import org.apache.flink.table.refresh.ContinuousRefreshHandlerSerializer; import java.io.IOException; +import java.util.List; + +import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.awaitOperationTermination; +import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.fetchAllResults; +import static org.assertj.core.api.Assertions.assertThat; /** Helpers shared between the materialized table gateway ITCases. */ final class MaterializedTableTestUtils { @@ -50,4 +57,26 @@ static ContinuousRefreshHandler getContinuousRefreshHandler( return ContinuousRefreshHandlerSerializer.INSTANCE.deserialize( resolvedTable.getSerializedRefreshHandler(), classLoader); } + + /** + * Runs an EXPLAIN statement through the gateway and asserts its full plan equals the given + * golden resource. The per-test catalog name is normalized to {@code $CATALOG} before + * comparison. + */ + static void verifyExplainPlan( + SqlGatewayService service, + SessionHandle sessionHandle, + String catalogName, + String explainStatement, + String expectedResource) + throws Exception { + OperationHandle handle = executeStatement(service, sessionHandle, explainStatement); + awaitOperationTermination(service, sessionHandle, handle); + List results = fetchAllResults(service, sessionHandle, handle); + assertThat(results).hasSize(1); + String actual = + TableTestUtil.replaceStageId( + results.get(0).getString(0).toString().replace(catalogName, "$CATALOG")); + assertThat(actual).isEqualTo(TableTestUtil.readFromResource(expectedResource)); + } } diff --git a/flink-table/flink-sql-gateway/src/test/resources/explain/testExplainConvertTableToMaterializedTable.out b/flink-table/flink-sql-gateway/src/test/resources/explain/testExplainConvertTableToMaterializedTable.out new file mode 100644 index 0000000000000..1184984753d02 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/test/resources/explain/testExplainConvertTableToMaterializedTable.out @@ -0,0 +1,14 @@ +== Abstract Syntax Tree == +LogicalSink(table=[$CATALOG.test_db.users_shops], fields=[user_id, shop_id, payment_amount_cents]) ++- LogicalProject(user_id=[$2], shop_id=[$3], payment_amount_cents=[$8]) + +- LogicalTableScan(table=[[$CATALOG, test_db, datagenSource]]) + +== Optimized Physical Plan == +Sink(table=[$CATALOG.test_db.users_shops], fields=[user_id, shop_id, payment_amount_cents]) ++- Calc(select=[user_id, shop_id, payment_amount_cents]) + +- TableSourceScan(table=[[$CATALOG, test_db, datagenSource]], fields=[order_id, order_number, user_id, shop_id, product_id, status, order_type, order_created_at, payment_amount_cents]) + +== Optimized Execution Plan == +Sink(table=[$CATALOG.test_db.users_shops], fields=[user_id, shop_id, payment_amount_cents]) ++- Calc(select=[user_id, shop_id, payment_amount_cents]) + +- TableSourceScan(table=[[$CATALOG, test_db, datagenSource]], fields=[order_id, order_number, user_id, shop_id, product_id, status, order_type, order_created_at, payment_amount_cents]) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java index 573d1619d2de8..2fc6d9931cf75 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation; +import org.apache.flink.table.operations.materializedtable.ConvertTableToMaterializedTableOperation; import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation; /** @@ -45,4 +46,6 @@ public interface ModifyOperationVisitor { T visit(CreateMaterializedTableOperation createMaterializedTableOperation); T visit(AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation); + + T visit(ConvertTableToMaterializedTableOperation convertTableToMaterializedTableOperation); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/ConvertTableToMaterializedTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/ConvertTableToMaterializedTableOperation.java index 8a83ea0f8e84a..f00aef5e9f2b0 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/ConvertTableToMaterializedTableOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/ConvertTableToMaterializedTableOperation.java @@ -26,8 +26,11 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.TableChange; import org.apache.flink.table.operations.ExecutableOperation; +import org.apache.flink.table.operations.ModifyOperation; +import org.apache.flink.table.operations.ModifyOperationVisitor; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.OperationUtils; +import org.apache.flink.table.operations.QueryOperation; import java.util.LinkedHashMap; import java.util.List; @@ -41,24 +44,27 @@ */ @Internal public class ConvertTableToMaterializedTableOperation - implements MaterializedTableOperation, ExecutableOperation { + implements MaterializedTableOperation, ExecutableOperation, ModifyOperation { private final ObjectIdentifier tableIdentifier; private final ResolvedCatalogTable originalTable; private final ResolvedCatalogMaterializedTable materializedTable; private final Function> tableChangesForTable; + private final QueryOperation sinkModifyQuery; private List tableChanges; public ConvertTableToMaterializedTableOperation( ObjectIdentifier tableIdentifier, ResolvedCatalogTable originalTable, ResolvedCatalogMaterializedTable materializedTable, - Function> tableChangesBuilder) { + Function> tableChangesBuilder, + QueryOperation sinkModifyQuery) { this.tableIdentifier = tableIdentifier; this.originalTable = originalTable; this.materializedTable = materializedTable; this.tableChangesForTable = tableChangesBuilder; + this.sinkModifyQuery = sinkModifyQuery; } @Override @@ -81,6 +87,20 @@ public ResolvedCatalogMaterializedTable getMaterializedTable() { return materializedTable; } + public QueryOperation getSinkModifyQuery() { + return sinkModifyQuery; + } + + @Override + public QueryOperation getChild() { + return sinkModifyQuery; + } + + @Override + public T accept(ModifyOperationVisitor visitor) { + return visitor.visit(this); + } + public List getTableChanges() { if (tableChanges == null) { tableChanges = tableChangesForTable.apply(materializedTable); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java index e46b7d614bdd1..3d72ff33ca8b5 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java @@ -168,6 +168,10 @@ private Operation handleConvert( final ResolvedCatalogMaterializedTable resolvedNewMaterializedTable = context.getCatalogManager().resolveCatalogMaterializedTable(newMaterializedTable); + final PlannerQueryOperation query = + SqlNodeConvertUtils.validateAndConvertAsQuery( + sqlCreateOrAlterMaterializedTable.getAsQuery(), context); + return new ConvertTableToMaterializedTableOperation( identifier, oldBaseTable, @@ -177,7 +181,8 @@ private Operation handleConvert( oldBaseTable, resolvedCatalogMaterializedTable, baseMergeContext.hasSchemaDefinition(), - baseMergeContext.hasConstraintDefinition())); + baseMergeContext.hasConstraintDefinition()), + query); } private List buildConversionTableChanges( diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala index e05b108b4a6a7..e51d069544541 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala @@ -35,7 +35,7 @@ import org.apache.flink.table.module.{Module, ModuleManager} import org.apache.flink.table.operations._ import org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode import org.apache.flink.table.operations.ddl.CreateTableOperation -import org.apache.flink.table.operations.materializedtable.{AlterMaterializedTableChangeOperation, CreateMaterializedTableOperation} +import org.apache.flink.table.operations.materializedtable.{AlterMaterializedTableChangeOperation, ConvertTableToMaterializedTableOperation, CreateMaterializedTableOperation} import org.apache.flink.table.planner.JMap import org.apache.flink.table.planner.calcite._ import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema @@ -301,9 +301,8 @@ abstract class PlannerBase( if alterMtOperation.getSinkModifyQuery != null => val newTable = catalogManager.resolveCatalogMaterializedTable(alterMtOperation.getNewTable) - // An alter writes to the existing backing storage, so carry over the old table's resolved - // connector options (e.g. the managed `path` the catalog injects at create time) that the - // rebuilt new table does not retain. New options win on conflict. + // The alter reuses the existing storage, so keep the old table's resolved connector + // options (e.g. managed `path`); new options override. val sinkOptions = new util.HashMap[String, String](alterMtOperation.getOldTable.getOptions) sinkOptions.putAll(newTable.getOptions) @@ -313,6 +312,19 @@ abstract class PlannerBase( alterMtOperation.getChild, Collections.emptyMap()) + case convertMtOperation: ConvertTableToMaterializedTableOperation + if convertMtOperation.getSinkModifyQuery != null => + // The conversion reuses the original table's storage, so keep its connector options. + val sinkOptions = + new util.HashMap[String, String](convertMtOperation.getOriginalTable.getOptions) + sinkOptions.putAll(convertMtOperation.getMaterializedTable.getOptions) + convertMaterializedTableToRel( + convertMtOperation.getTableIdentifier, + convertMtOperation.getMaterializedTable.toResolvedCatalogTable.copy(sinkOptions), + convertMtOperation.getChild, + Collections.emptyMap() + ) + case stagedSink: StagedSinkModifyOperation => val input = createRelBuilder.queryOperation(modifyOperation.getChild).build() DynamicSinkUtils.convertSinkToRel(