Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,8 @@ private ResultFetcher callAlterMaterializedTableChangeOperation(
new AlterMaterializedTableChangeOperation(
op.getTableIdentifier(),
oldTable -> op.getTableChanges(),
suspendMaterializedTable);
suspendMaterializedTable,
op.getSinkModifyQuery());
operationExecutor.callExecutableOperation(
handle, alterMaterializedTableChangeOperation);

Expand Down Expand Up @@ -1010,7 +1011,10 @@ private ResultFetcher callAlterMaterializedTableChangeOperation(

AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation =
new AlterMaterializedTableChangeOperation(
tableIdentifier, oldTable -> tableChanges, oldMaterializedTable);
tableIdentifier,
oldTable -> tableChanges,
oldMaterializedTable,
op.getSinkModifyQuery());

operationExecutor.callExecutableOperation(
handle, alterMaterializedTableChangeOperation);
Expand All @@ -1029,7 +1033,10 @@ private AlterMaterializedTableChangeOperation generateRollbackAlterMaterializedT
AlterMaterializedTableChangeOperation op) {

return new AlterMaterializedTableChangeOperation(
op.getTableIdentifier(), oldTable -> List.of(), oldMaterializedTable);
op.getTableIdentifier(),
oldTable -> List.of(),
oldMaterializedTable,
op.getSinkModifyQuery());
}

private TableChange.ModifyRefreshHandler generateResetSavepointTableChange(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,8 @@ private ResultFetcher executeOperationInStatementSetState(
TableEnvironmentInternal tableEnv, OperationHandle handle, Operation operation) {
if (operation instanceof EndStatementSetOperation) {
return callEndStatementSetOperation(tableEnv, handle);
} else if (operation instanceof ModifyOperation) {
} else if (operation instanceof ModifyOperation
&& !(operation instanceof MaterializedTableOperation)) {
sessionContext.addStatementSetOperation((ModifyOperation) operation);
return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
} else {
Expand All @@ -518,7 +519,7 @@ private ResultFetcher executeOperation(
} else if (op instanceof EndStatementSetOperation) {
throw new SqlExecutionException(
"No Statement Set to submit. 'END' statement should be used after 'BEGIN STATEMENT SET'.");
} else if (op instanceof ModifyOperation) {
} else if (op instanceof ModifyOperation && !(op instanceof MaterializedTableOperation)) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason MaterializedTableOperation is not a ModifyOperation

however after that we call callModifyOperations ?

return callModifyOperations(
tableEnv, handle, Collections.singletonList((ModifyOperation) op));
} else if (op instanceof CompileAndExecutePlanOperation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshStatus;
import org.apache.flink.table.catalog.ObjectIdentifier;
Expand Down Expand Up @@ -62,6 +63,7 @@
import static org.apache.flink.table.gateway.service.MaterializedTableTestUtils.executeStatement;
import static org.apache.flink.table.gateway.service.MaterializedTableTestUtils.getContinuousRefreshHandler;
import static org.apache.flink.table.gateway.service.MaterializedTableTestUtils.getTable;
import static org.apache.flink.table.gateway.service.MaterializedTableTestUtils.verifyExplainPlan;
import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.awaitOperationTermination;
import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.fetchAllResults;
import static org.apache.flink.test.util.TestUtils.waitUntilAllTasksAreRunning;
Expand Down Expand Up @@ -293,6 +295,39 @@ void testConvertTableLeftSuspendedWhenRefreshJobFailsToStart() throws Exception
assertThat(suspendedMaterializedTable.getRefreshStatus()).isSameAs(RefreshStatus.SUSPENDED);
}

@Test
void testExplainConvertTableToMaterializedTable() throws Exception {
// pre-create a regular table that CREATE OR ALTER would convert in place.
String createRegularTableDDL =
"CREATE TABLE users_shops (\n"
+ " user_id BIGINT,\n"
+ " shop_id BIGINT,\n"
+ " payment_amount_cents BIGINT\n"
+ ")\n"
+ "WITH ('connector' = 'values')";
OperationHandle createTableHandle =
executeStatement(service, sessionHandle, createRegularTableDDL);
awaitOperationTermination(service, sessionHandle, createTableHandle);

String explainDDL =
"EXPLAIN CREATE OR ALTER MATERIALIZED TABLE users_shops"
+ " FRESHNESS = INTERVAL '30' SECOND"
+ " AS SELECT user_id, shop_id, payment_amount_cents FROM datagenSource";

verifyExplainPlan(
service,
sessionHandle,
fileSystemCatalogName,
explainDDL,
"/explain/testExplainConvertTableToMaterializedTable.out");

// EXPLAIN is side-effect-free: the entry is still a regular table, not converted.
assertThat(
service.getTable(sessionHandle, getObjectIdentifier("users_shops"))
.getTableKind())
.isSameAs(CatalogBaseTable.TableKind.TABLE);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use static import if it requires more than 2 names

}

private SessionHandle initializeSession() {
SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
String catalogDDL =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1858,6 +1858,78 @@ void testRefreshMaterializedTableWithInvalidParameterInContinuousMode() throws E
dropMaterializedTable(getObjectIdentifier("my_materialized_table"));
}

@Test
void testExplainCreateOrAlterMaterializedTableOnExistingTable() throws Exception {
createAndVerifyCreateMaterializedTableWithData(
"users_shops", List.of(), Map.of(), RefreshMode.FULL);
ObjectIdentifier userShopsIdentifier = getObjectIdentifier("users_shops");
ResolvedCatalogMaterializedTable oldTable = getTable(userShopsIdentifier);

// CREATE OR ALTER on an existing table behaves like an alter-with-query. Re-state the same
// definition so EXPLAIN plans the sink over the (unchanged) table.
String explainDDL =
"EXPLAIN CREATE OR ALTER MATERIALIZED TABLE users_shops"
+ " PARTITIONED BY (ds)"
+ " WITH ('format' = 'debezium-json')"
+ " FRESHNESS = INTERVAL '30' SECOND"
+ " REFRESH_MODE = FULL"
+ " AS SELECT"
+ " user_id,"
+ " shop_id,"
+ " ds,"
+ " COUNT(order_id) AS order_cnt"
+ " FROM ("
+ " SELECT user_id, shop_id, order_created_at AS ds, order_id FROM my_source"
+ " ) AS tmp"
+ " GROUP BY (user_id, shop_id, ds)";

verifyExplainPlan(
explainDDL,
"/explain/testExplainCreateOrAlterMaterializedTableOnExistingTable.out");

// EXPLAIN is side-effect-free: the existing materialized table is unchanged.
ResolvedCatalogMaterializedTable newTable = getTable(userShopsIdentifier);
assertThat(newTable.getResolvedSchema()).isEqualTo(oldTable.getResolvedSchema());
assertThat(newTable.getExpandedQuery()).isEqualTo(oldTable.getExpandedQuery());

dropMaterializedTable(userShopsIdentifier);
}

@Test
void testExplainAlterMaterializedTableAsQuery() throws Exception {
createAndVerifyCreateMaterializedTableWithData(
"users_shops", List.of(), Map.of(), RefreshMode.FULL);
ObjectIdentifier userShopsIdentifier = getObjectIdentifier("users_shops");
ResolvedCatalogMaterializedTable oldTable = getTable(userShopsIdentifier);

String explainDDL =
"EXPLAIN ALTER MATERIALIZED TABLE users_shops"
+ " AS SELECT"
+ " user_id,"
+ " shop_id,"
+ " ds,"
+ " COUNT(order_id) AS order_cnt"
+ " FROM ("
+ " SELECT user_id, shop_id, order_created_at AS ds, order_id FROM my_source"
+ " ) AS tmp"
+ " GROUP BY (user_id, shop_id, ds)";

verifyExplainPlan(explainDDL, "/explain/testExplainAlterMaterializedTableAsQuery.out");

// EXPLAIN is side-effect-free: the materialized table definition is unchanged.
ResolvedCatalogMaterializedTable newTable = getTable(userShopsIdentifier);
assertThat(newTable.getResolvedSchema()).isEqualTo(oldTable.getResolvedSchema());
assertThat(newTable.getExpandedQuery()).isEqualTo(oldTable.getExpandedQuery());

dropMaterializedTable(userShopsIdentifier);
}

private void verifyExplainPlan(String explainStatement, String expectedResource)
throws Exception {
MaterializedTableTestUtils.verifyExplainPlan(
service, sessionHandle, fileSystemCatalogName, explainStatement, expectedResource);
}

private void setupSavepointDir(Path temporaryPath) throws Exception {
String savepointDir = "file://" + temporaryPath.toAbsolutePath();
String setupSavepointDDL =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,20 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.planner.utils.TableTestUtil;
import org.apache.flink.table.refresh.ContinuousRefreshHandler;
import org.apache.flink.table.refresh.ContinuousRefreshHandlerSerializer;

import java.io.IOException;
import java.util.List;

import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.awaitOperationTermination;
import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.fetchAllResults;
import static org.assertj.core.api.Assertions.assertThat;

/** Helpers shared between the materialized table gateway ITCases. */
final class MaterializedTableTestUtils {
Expand All @@ -50,4 +57,26 @@ static ContinuousRefreshHandler getContinuousRefreshHandler(
return ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
resolvedTable.getSerializedRefreshHandler(), classLoader);
}

/**
* Runs an EXPLAIN statement through the gateway and asserts its full plan equals the given
* golden resource. The per-test catalog name is normalized to {@code $CATALOG} before
* comparison.
*/
static void verifyExplainPlan(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to create another plan verifier if there is already existing framework for this?
See TableTestBase

SqlGatewayService service,
SessionHandle sessionHandle,
String catalogName,
String explainStatement,
String expectedResource)
throws Exception {
OperationHandle handle = executeStatement(service, sessionHandle, explainStatement);
awaitOperationTermination(service, sessionHandle, handle);
List<RowData> results = fetchAllResults(service, sessionHandle, handle);
assertThat(results).hasSize(1);
String actual =
TableTestUtil.replaceStageId(
results.get(0).getString(0).toString().replace(catalogName, "$CATALOG"));
assertThat(actual).isEqualTo(TableTestUtil.readFromResource(expectedResource));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
== Abstract Syntax Tree ==
LogicalSink(table=[$CATALOG.test_db.users_shops], fields=[user_id, shop_id, ds, order_cnt])
+- LogicalAggregate(group=[{0, 1, 2}], order_cnt=[COUNT($3)])
+- LogicalProject(user_id=[$1], shop_id=[$2], ds=[$3], order_id=[$0])
+- LogicalTableScan(table=[[$CATALOG, test_db, my_source]])

== Optimized Physical Plan ==
Sink(table=[$CATALOG.test_db.users_shops], fields=[user_id, shop_id, order_created_at, order_cnt])
+- GroupAggregate(groupBy=[user_id, shop_id, order_created_at], select=[user_id, shop_id, order_created_at, COUNT(order_id) AS order_cnt])
+- Exchange(distribution=[hash[user_id, shop_id, order_created_at]])
+- TableSourceScan(table=[[$CATALOG, test_db, my_source]], fields=[order_id, user_id, shop_id, order_created_at])

== Optimized Execution Plan ==
Sink(table=[$CATALOG.test_db.users_shops], fields=[user_id, shop_id, order_created_at, order_cnt])
+- GroupAggregate(groupBy=[user_id, shop_id, order_created_at], select=[user_id, shop_id, order_created_at, COUNT(order_id) AS order_cnt])
+- Exchange(distribution=[hash[user_id, shop_id, order_created_at]])
+- TableSourceScan(table=[[$CATALOG, test_db, my_source]], fields=[order_id, user_id, shop_id, order_created_at])
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
== Abstract Syntax Tree ==
LogicalSink(table=[$CATALOG.test_db.users_shops], fields=[user_id, shop_id, payment_amount_cents])
+- LogicalProject(user_id=[$2], shop_id=[$3], payment_amount_cents=[$8])
+- LogicalTableScan(table=[[$CATALOG, test_db, datagenSource]])

== Optimized Physical Plan ==
Sink(table=[$CATALOG.test_db.users_shops], fields=[user_id, shop_id, payment_amount_cents])
+- Calc(select=[user_id, shop_id, payment_amount_cents])
+- TableSourceScan(table=[[$CATALOG, test_db, datagenSource]], fields=[order_id, order_number, user_id, shop_id, product_id, status, order_type, order_created_at, payment_amount_cents])

== Optimized Execution Plan ==
Sink(table=[$CATALOG.test_db.users_shops], fields=[user_id, shop_id, payment_amount_cents])
+- Calc(select=[user_id, shop_id, payment_amount_cents])
+- TableSourceScan(table=[[$CATALOG, test_db, datagenSource]], fields=[order_id, order_number, user_id, shop_id, product_id, status, order_type, order_created_at, payment_amount_cents])
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
== Abstract Syntax Tree ==
LogicalSink(table=[$CATALOG.test_db.users_shops], fields=[user_id, shop_id, ds, order_cnt])
+- LogicalAggregate(group=[{0, 1, 2}], order_cnt=[COUNT($3)])
+- LogicalProject(user_id=[$1], shop_id=[$2], ds=[$3], order_id=[$0])
+- LogicalTableScan(table=[[$CATALOG, test_db, my_source]])

== Optimized Physical Plan ==
Sink(table=[$CATALOG.test_db.users_shops], fields=[user_id, shop_id, order_created_at, order_cnt])
+- GroupAggregate(groupBy=[user_id, shop_id, order_created_at], select=[user_id, shop_id, order_created_at, COUNT(order_id) AS order_cnt])
+- Exchange(distribution=[hash[user_id, shop_id, order_created_at]])
+- TableSourceScan(table=[[$CATALOG, test_db, my_source]], fields=[order_id, user_id, shop_id, order_created_at])

== Optimized Execution Plan ==
Sink(table=[$CATALOG.test_db.users_shops], fields=[user_id, shop_id, order_created_at, order_cnt])
+- GroupAggregate(groupBy=[user_id, shop_id, order_created_at], select=[user_id, shop_id, order_created_at, COUNT(order_id) AS order_cnt])
+- Exchange(distribution=[hash[user_id, shop_id, order_created_at]])
+- TableSourceScan(table=[[$CATALOG, test_db, my_source]], fields=[order_id, user_id, shop_id, order_created_at])
Original file line number Diff line number Diff line change
Expand Up @@ -3138,16 +3138,25 @@ SqlNode SqlRichExplain() :
stmt = RichSqlInsert()
|
stmt = SqlCreate()
|
stmt = SqlAlterMaterializedTable()
)
{
if ((stmt instanceof SqlCreate)
&& !(stmt instanceof SqlCreateTableAs)
&& !(stmt instanceof SqlReplaceTableAs)) {
&& !(stmt instanceof SqlReplaceTableAs)
&& !(stmt instanceof SqlCreateOrAlterMaterializedTable)) {
throw SqlUtil.newContextException(
getPos(),
ParserResource.RESOURCE.explainCreateOrReplaceStatementUnsupported());
}

if ((stmt instanceof SqlAlterMaterializedTable) && !(stmt instanceof SqlAlterMaterializedTableAsQuery)) {
throw SqlUtil.newContextException(
getPos(),
ParserResource.RESOURCE.explainAlterMaterializedTableUnsupported());
}

return new SqlRichExplain(getPos(), stmt, explainDetails);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public interface ParserResource {
"Unsupported CREATE OR REPLACE statement for EXPLAIN. The statement must define a query using the AS clause (i.e. CTAS/RTAS statements).")
Resources.ExInst<ParseException> explainCreateOrReplaceStatementUnsupported();

@Resources.BaseMessage(
"Unsupported ALTER MATERIALIZED TABLE statement for EXPLAIN. The statement must define a query using the AS clause.")
Resources.ExInst<ParseException> explainAlterMaterializedTableUnsupported();

@Resources.BaseMessage(
"Columns identifiers without types in the schema are supported on CTAS/RTAS statements only.")
Resources.ExInst<ParseException> columnsIdentifiersUnsupported();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3170,29 +3170,40 @@ void testAnalyzeTable() {
.fails("(?s).*Encountered \"\\,\" at line 1, column 32.\n.*");
}

@Test
void testExplainCreateTableNoSupported() {
this.sql("EXPLAIN CREATE TABLE t (id int^)^")
@ParameterizedTest
@CsvSource({"CREATE", "CREATE OR REPLACE"})
void testExplainCreateTableNotSupported(final String operation) {
this.sql(String.format("EXPLAIN %s TABLE t (id int^)^", operation))
.fails(
"Unsupported CREATE OR REPLACE statement for EXPLAIN\\. The statement must define a query using the AS clause \\(i\\.e\\. CTAS/RTAS statements\\)\\.");
}

@Test
void testExplainCreateTableAsSelect() {
this.sql("EXPLAIN CREATE TABLE t AS SELECT * FROM b")
.ok("EXPLAIN CREATE TABLE `T`\nAS\nSELECT *\nFROM `B`");
}

@Test
void testExplainCreateOrReplaceTableAsSelect() {
this.sql("EXPLAIN CREATE OR REPLACE TABLE t AS SELECT * FROM b")
.ok("EXPLAIN CREATE OR REPLACE TABLE `T`\nAS\nSELECT *\nFROM `B`");
@ParameterizedTest
@CsvSource({
"ALTER MATERIALIZED TABLE t ^SUSPEND^",
"ALTER MATERIALIZED TABLE t ^RESUME^",
"ALTER MATERIALIZED TABLE t ADD category_id STRING METADATA ^VIRTUAL^",
"ALTER MATERIALIZED TABLE t MODIFY measurement double ^METADATA^"
})
void testExplainAlterMaterializedTableWithoutAsQueryNotSupported(
final String alterMaterializedTable) {
this.sql(String.format("EXPLAIN %s", alterMaterializedTable))
.fails(
"Unsupported ALTER MATERIALIZED TABLE statement for EXPLAIN\\. The statement must define a query using the AS clause.");
}

@Test
void testExplainReplaceTableAsSelect() {
this.sql("EXPLAIN REPLACE TABLE t AS SELECT * FROM b")
.ok("EXPLAIN REPLACE TABLE `T`\nAS\nSELECT *\nFROM `B`");
@ParameterizedTest
@CsvSource({
"CREATE",
"CREATE OR REPLACE",
"REPLACE",
"CREATE MATERIALIZED",
"ALTER MATERIALIZED",
"CREATE OR ALTER MATERIALIZED",
})
void testExplain(final String operation) {
this.sql(String.format("EXPLAIN %s TABLE t AS SELECT * FROM b", operation))
.ok(String.format("EXPLAIN %s TABLE `T`\nAS\nSELECT *\nFROM `B`", operation));
}

@Test
Expand Down
Loading