diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/DescribeFunctionTestAgg.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/DescribeFunctionTestAgg.java new file mode 100644 index 0000000000000..c6180d30014d8 --- /dev/null +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/DescribeFunctionTestAgg.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.cli.utils; + +import org.apache.flink.table.annotation.StateHint; +import org.apache.flink.table.functions.AggregateFunction; + +/** + * Minimal {@link AggregateFunction} used by {@code function.q} to exercise {@code DESCRIBE FUNCTION + * EXTENDED} on a function whose accumulator surfaces as a {@code state:} row. The body is a no-op + * since only the introspection output is asserted. + */ +public class DescribeFunctionTestAgg extends AggregateFunction { + + /** Accumulator type for the aggregate. */ + public static class Acc { + public Long sum; + public Long count; + } + + @Override + public Acc createAccumulator() { + return new Acc(); + } + + public void accumulate(@StateHint(ttl = "2 d") Acc acc, Long value) {} + + @Override + public Long getValue(Acc accumulator) { + return 0L; + } +} diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/DescribeFunctionTestMinimalPtf.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/DescribeFunctionTestMinimalPtf.java new file mode 100644 index 0000000000000..ef66217ed438f --- /dev/null +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/DescribeFunctionTestMinimalPtf.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.cli.utils; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.ArgumentTrait; +import org.apache.flink.table.functions.ProcessTableFunction; +import org.apache.flink.types.Row; + +/** + * Minimal Process Table Function used by {@code function.q} to exercise the {@code false} side of + * the per-PTF capability flags in {@code DESCRIBE FUNCTION EXTENDED}: no {@link + * org.apache.flink.table.annotation.StateHint StateHint} (no {@code state:} row), does not + * implement {@link org.apache.flink.table.functions.ChangelogFunction ChangelogFunction} ({@code is + * changelog function = false}), and declares no {@code onTimer} ({@code uses timers = false}). + */ +public class DescribeFunctionTestMinimalPtf extends ProcessTableFunction { + + public void eval(@ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row input) {} +} diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/DescribeFunctionTestPtf.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/DescribeFunctionTestPtf.java new file mode 100644 index 0000000000000..88d6b6922cc16 --- /dev/null +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/DescribeFunctionTestPtf.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.cli.utils; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.ArgumentTrait; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.StateHint; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.functions.ChangelogFunction; +import org.apache.flink.table.functions.ProcessTableFunction; +import org.apache.flink.types.Row; + +/** + * Process Table Function used by {@code function.q} to exercise {@code DESCRIBE FUNCTION EXTENDED}. + * + *

Carries: a state entry with TTL (via {@link StateHint}), a table arg with multiple traits (via + * {@link ArgumentHint}), an {@code onTimer} method, and an implementation of {@link + * ChangelogFunction} so that the {@code emits updates} and {@code uses timers} flags both light up. + * The bodies are no-ops since only the introspection output is asserted. + */ +public class DescribeFunctionTestPtf extends ProcessTableFunction + implements ChangelogFunction { + + public void eval( + @StateHint(type = @DataTypeHint("ROW"), ttl = "1 d") Row state, + @ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE, ArgumentTrait.OPTIONAL_PARTITION_BY}) + Row input) {} + + public void onTimer(Row state) {} + + @Override + public ChangelogMode getChangelogMode(ChangelogContext changelogContext) { + return ChangelogMode.insertOnly(); + } +} diff --git a/flink-table/flink-sql-client/src/test/resources/sql/function.q b/flink-table/flink-sql-client/src/test/resources/sql/function.q index 4cc0c043dc28c..8c5f96a2b5e45 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/function.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/function.q @@ -502,3 +502,79 @@ describe function extended temp_upperudf; +---------------------------+-------------------------------+ 7 rows in set !ok + +# ========================================================================== +# test describe function on a process table function (PTF) +# ========================================================================== + +create temporary system function my_ptf as 'org.apache.flink.table.client.cli.utils.DescribeFunctionTestPtf'; +[INFO] Execute statement succeeded. +!info + +describe function extended my_ptf; ++---------------------------+---------------------------------------------------------------------+ +| info name | info value | ++---------------------------+---------------------------------------------------------------------+ +| is system function | true | +| is temporary | true | +| kind | PROCESS_TABLE | +| requirements | [] | +| is deterministic | true | +| supports constant folding | true | +| signature | my_ptf(input => {TABLE, SET SEMANTIC TABLE, OPTIONAL PARTITION BY}) | +| state: state | type=ROW<`count` BIGINT>, ttl=PT24H | +| accepts system arguments | true | +| is changelog function | true | +| uses timers | true | ++---------------------------+---------------------------------------------------------------------+ +11 rows in set +!ok + +# A minimal PTF demonstrates the "false" side of all three capability flags: +# no @StateHint, no ChangelogFunction implementation, no onTimer method. +create temporary system function my_minimal_ptf as 'org.apache.flink.table.client.cli.utils.DescribeFunctionTestMinimalPtf'; +[INFO] Execute statement succeeded. +!info + +describe function extended my_minimal_ptf; ++---------------------------+------------------------------------------------------+ +| info name | info value | ++---------------------------+------------------------------------------------------+ +| is system function | true | +| is temporary | true | +| kind | PROCESS_TABLE | +| requirements | [] | +| is deterministic | true | +| supports constant folding | true | +| signature | my_minimal_ptf(input => {TABLE, SET SEMANTIC TABLE}) | +| accepts system arguments | true | +| is changelog function | false | +| uses timers | false | ++---------------------------+------------------------------------------------------+ +10 rows in set +!ok + +# ========================================================================== +# test describe function on a user-defined aggregate function (accumulator +# surfaces via the same state:* row mechanism as PTFs) +# ========================================================================== + +create temporary system function my_agg as 'org.apache.flink.table.client.cli.utils.DescribeFunctionTestAgg'; +[INFO] Execute statement succeeded. +!info + +describe function extended my_agg; ++---------------------------+---------------------------------------------------------------------------------------------------------------------------------+ +| info name | info value | ++---------------------------+---------------------------------------------------------------------------------------------------------------------------------+ +| is system function | true | +| is temporary | true | +| kind | AGGREGATE | +| requirements | [] | +| is deterministic | true | +| supports constant folding | true | +| signature | my_agg(value => BIGINT) | +| state: acc | type=STRUCTURED<'org.apache.flink.table.client.cli.utils.DescribeFunctionTestAgg$Acc', `count` BIGINT, `sum` BIGINT>, ttl=PT48H | ++---------------------------+---------------------------------------------------------------------------------------------------------------------------------+ +8 rows in set +!ok diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeFunctionOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeFunctionOperation.java index 7fa63bdb9c7ed..bddbde6b9113f 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeFunctionOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeFunctionOperation.java @@ -25,9 +25,16 @@ import org.apache.flink.table.catalog.CatalogFunction; import org.apache.flink.table.catalog.ContextResolvedFunction; import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.functions.ChangelogFunction; import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.FunctionKind; +import org.apache.flink.table.functions.UserDefinedFunctionHelper; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.extraction.ExtractionUtils; +import org.apache.flink.table.types.inference.StateTypeStrategy; +import org.apache.flink.table.types.inference.TypeInference; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -114,14 +121,13 @@ public TableResultInternal execute(Context ctx) { Arrays.asList( "supports constant folding", String.valueOf(definition.supportsConstantFolding()))); + final TypeInference typeInference = + definition.getTypeInference(ctx.getCatalogManager().getDataTypeFactory()); rows.add( Arrays.asList( "signature", - generateSignature( - definition.getTypeInference( - ctx.getCatalogManager().getDataTypeFactory()), - function.toString(), - definition))); + generateSignature(typeInference, function.toString(), definition))); + rows.addAll(buildPtfMetadataRows(definition, typeInference)); } return buildTableResult( @@ -129,4 +135,64 @@ public TableResultInternal execute(Context ctx) { new DataType[] {DataTypes.STRING(), DataTypes.STRING()}, rows.stream().map(List::toArray).toArray(Object[][]::new)); } + + /** + * Builds supplemental info rows from the given {@link FunctionDefinition} and {@link + * TypeInference}: a {@code state: } row per state entry (with type and TTL) and, for + * PROCESS_TABLE functions, three capability rows: {@code accepts system arguments} (whether the + * framework auto-injects {@code uid} / {@code on_time}), {@code is changelog function} (whether + * the function implements {@link ChangelogFunction}), and {@code uses timers} (whether the + * function declares an {@code onTimer} method). Returns an empty list when none apply. + */ + private static List> buildPtfMetadataRows( + FunctionDefinition definition, TypeInference typeInference) { + final List> rows = new ArrayList<>(); + typeInference + .getStateTypeStrategies() + .forEach( + (name, strategy) -> + rows.add( + Arrays.asList( + "state: " + name, formatStateEntry(strategy)))); + if (definition.getKind() == FunctionKind.PROCESS_TABLE) { + rows.add( + Arrays.asList( + "accepts system arguments", + String.valueOf(!typeInference.disableSystemArguments()))); + rows.add( + Arrays.asList( + "is changelog function", + String.valueOf(definition instanceof ChangelogFunction))); + rows.add( + Arrays.asList( + "uses timers", + String.valueOf( + !ExtractionUtils.collectMethods( + definition.getClass(), + UserDefinedFunctionHelper + .PROCESS_TABLE_ON_TIMER) + .isEmpty()))); + } + return rows; + } + + private static String formatStateEntry(StateTypeStrategy strategy) { + // We have no CallContext at DESCRIBE time. Many strategies (including TTL lookups in + // DefaultStateTypeStrategy) ignore the context, but inferType forwards it to the wrapped + // TypeStrategy which may dereference it. Catch and degrade to rather than + // failing the entire DESCRIBE for one strategy that needs a real CallContext. + String typeStr; + try { + typeStr = strategy.inferType(null).map(Object::toString).orElse(""); + } catch (Exception e) { + typeStr = ""; + } + final Optional ttl; + try { + ttl = strategy.getTimeToLive(null); + } catch (Exception e) { + return "type=" + typeStr + ", ttl="; + } + return "type=" + typeStr + ", ttl=" + ttl.map(Duration::toString).orElse(""); + } }