Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.client.cli.utils;

import org.apache.flink.table.annotation.StateHint;
import org.apache.flink.table.functions.AggregateFunction;

/**
* Minimal {@link AggregateFunction} used by {@code function.q} to exercise {@code DESCRIBE FUNCTION
* EXTENDED} on a function whose accumulator surfaces as a {@code state:} row. The body is a no-op
* since only the introspection output is asserted.
*/
public class DescribeFunctionTestAgg extends AggregateFunction<Long, DescribeFunctionTestAgg.Acc> {

/** Accumulator type for the aggregate. */
public static class Acc {
public Long sum;
public Long count;
}

@Override
public Acc createAccumulator() {
return new Acc();
}

public void accumulate(@StateHint(ttl = "2 d") Acc acc, Long value) {}

@Override
public Long getValue(Acc accumulator) {
return 0L;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.client.cli.utils;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.ArgumentTrait;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.types.Row;

/**
* Minimal Process Table Function used by {@code function.q} to exercise the {@code false} side of
* the per-PTF capability flags in {@code DESCRIBE FUNCTION EXTENDED}: no {@link
* org.apache.flink.table.annotation.StateHint StateHint} (no {@code state:} row), does not
* implement {@link org.apache.flink.table.functions.ChangelogFunction ChangelogFunction} ({@code is
* changelog function = false}), and declares no {@code onTimer} ({@code uses timers = false}).
*/
public class DescribeFunctionTestMinimalPtf extends ProcessTableFunction<Long> {

public void eval(@ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row input) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.client.cli.utils;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.ArgumentTrait;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.StateHint;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.functions.ChangelogFunction;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.types.Row;

/**
* Process Table Function used by {@code function.q} to exercise {@code DESCRIBE FUNCTION EXTENDED}.
*
* <p>Carries: a state entry with TTL (via {@link StateHint}), a table arg with multiple traits (via
* {@link ArgumentHint}), an {@code onTimer} method, and an implementation of {@link
* ChangelogFunction} so that the {@code emits updates} and {@code uses timers} flags both light up.
* The bodies are no-ops since only the introspection output is asserted.
*/
public class DescribeFunctionTestPtf extends ProcessTableFunction<Long>
implements ChangelogFunction {

public void eval(
@StateHint(type = @DataTypeHint("ROW<count BIGINT>"), ttl = "1 d") Row state,
@ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE, ArgumentTrait.OPTIONAL_PARTITION_BY})
Row input) {}

public void onTimer(Row state) {}

@Override
public ChangelogMode getChangelogMode(ChangelogContext changelogContext) {
return ChangelogMode.insertOnly();
}
}
76 changes: 76 additions & 0 deletions flink-table/flink-sql-client/src/test/resources/sql/function.q
Original file line number Diff line number Diff line change
Expand Up @@ -502,3 +502,79 @@ describe function extended temp_upperudf;
+---------------------------+-------------------------------+
7 rows in set
!ok

# ==========================================================================
# test describe function on a process table function (PTF)
# ==========================================================================

create temporary system function my_ptf as 'org.apache.flink.table.client.cli.utils.DescribeFunctionTestPtf';
[INFO] Execute statement succeeded.
!info

describe function extended my_ptf;
+---------------------------+---------------------------------------------------------------------+
| info name | info value |
+---------------------------+---------------------------------------------------------------------+
| is system function | true |
| is temporary | true |
| kind | PROCESS_TABLE |
| requirements | [] |
| is deterministic | true |
| supports constant folding | true |
| signature | my_ptf(input => {TABLE, SET SEMANTIC TABLE, OPTIONAL PARTITION BY}) |
| state: state | type=ROW<`count` BIGINT>, ttl=PT24H |
| accepts system arguments | true |
| is changelog function | true |
| uses timers | true |
+---------------------------+---------------------------------------------------------------------+
11 rows in set
!ok

# A minimal PTF demonstrates the "false" side of all three capability flags:
# no @StateHint, no ChangelogFunction implementation, no onTimer method.
create temporary system function my_minimal_ptf as 'org.apache.flink.table.client.cli.utils.DescribeFunctionTestMinimalPtf';
[INFO] Execute statement succeeded.
!info

describe function extended my_minimal_ptf;
+---------------------------+------------------------------------------------------+
| info name | info value |
+---------------------------+------------------------------------------------------+
| is system function | true |
| is temporary | true |
| kind | PROCESS_TABLE |
| requirements | [] |
| is deterministic | true |
| supports constant folding | true |
| signature | my_minimal_ptf(input => {TABLE, SET SEMANTIC TABLE}) |
| accepts system arguments | true |
| is changelog function | false |
| uses timers | false |
+---------------------------+------------------------------------------------------+
10 rows in set
!ok

# ==========================================================================
# test describe function on a user-defined aggregate function (accumulator
# surfaces via the same state:* row mechanism as PTFs)
# ==========================================================================

create temporary system function my_agg as 'org.apache.flink.table.client.cli.utils.DescribeFunctionTestAgg';
[INFO] Execute statement succeeded.
!info

describe function extended my_agg;
+---------------------------+---------------------------------------------------------------------------------------------------------------------------------+
| info name | info value |
+---------------------------+---------------------------------------------------------------------------------------------------------------------------------+
| is system function | true |
| is temporary | true |
| kind | AGGREGATE |
| requirements | [] |
| is deterministic | true |
| supports constant folding | true |
| signature | my_agg(value => BIGINT) |
| state: acc | type=STRUCTURED<'org.apache.flink.table.client.cli.utils.DescribeFunctionTestAgg$Acc', `count` BIGINT, `sum` BIGINT>, ttl=PT48H |
+---------------------------+---------------------------------------------------------------------------------------------------------------------------------+
8 rows in set
!ok
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,16 @@
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.ContextResolvedFunction;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.functions.ChangelogFunction;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.FunctionKind;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.extraction.ExtractionUtils;
import org.apache.flink.table.types.inference.StateTypeStrategy;
import org.apache.flink.table.types.inference.TypeInference;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -114,19 +121,78 @@ public TableResultInternal execute(Context ctx) {
Arrays.asList(
"supports constant folding",
String.valueOf(definition.supportsConstantFolding())));
final TypeInference typeInference =
definition.getTypeInference(ctx.getCatalogManager().getDataTypeFactory());
rows.add(
Arrays.asList(
"signature",
generateSignature(
definition.getTypeInference(
ctx.getCatalogManager().getDataTypeFactory()),
function.toString(),
definition)));
generateSignature(typeInference, function.toString(), definition)));
rows.addAll(buildPtfMetadataRows(definition, typeInference));
}

return buildTableResult(
new String[] {"info name", "info value"},
new DataType[] {DataTypes.STRING(), DataTypes.STRING()},
rows.stream().map(List::toArray).toArray(Object[][]::new));
}

/**
* Builds supplemental info rows from the given {@link FunctionDefinition} and {@link
* TypeInference}: a {@code state: <name>} row per state entry (with type and TTL) and, for
* PROCESS_TABLE functions, three capability rows: {@code accepts system arguments} (whether the
* framework auto-injects {@code uid} / {@code on_time}), {@code is changelog function} (whether
* the function implements {@link ChangelogFunction}), and {@code uses timers} (whether the
* function declares an {@code onTimer} method). Returns an empty list when none apply.
*/
private static List<List<Object>> buildPtfMetadataRows(
FunctionDefinition definition, TypeInference typeInference) {
final List<List<Object>> rows = new ArrayList<>();
typeInference
.getStateTypeStrategies()
.forEach(
(name, strategy) ->
rows.add(
Arrays.asList(
"state: " + name, formatStateEntry(strategy))));
if (definition.getKind() == FunctionKind.PROCESS_TABLE) {
rows.add(
Arrays.asList(
"accepts system arguments",
String.valueOf(!typeInference.disableSystemArguments())));
rows.add(
Arrays.asList(
"is changelog function",
String.valueOf(definition instanceof ChangelogFunction)));
rows.add(
Arrays.asList(
"uses timers",
String.valueOf(
!ExtractionUtils.collectMethods(
definition.getClass(),
UserDefinedFunctionHelper
.PROCESS_TABLE_ON_TIMER)
.isEmpty())));
}
return rows;
}

private static String formatStateEntry(StateTypeStrategy strategy) {
// We have no CallContext at DESCRIBE time. Many strategies (including TTL lookups in
// DefaultStateTypeStrategy) ignore the context, but inferType forwards it to the wrapped
// TypeStrategy which may dereference it. Catch and degrade to <unknown> rather than
// failing the entire DESCRIBE for one strategy that needs a real CallContext.
String typeStr;
try {
typeStr = strategy.inferType(null).map(Object::toString).orElse("<unknown>");
} catch (Exception e) {
typeStr = "<unknown>";
}
final Optional<Duration> ttl;
try {
ttl = strategy.getTimeToLive(null);
} catch (Exception e) {
return "type=" + typeStr + ", ttl=<unknown>";
}
return "type=" + typeStr + ", ttl=" + ttl.map(Duration::toString).orElse("<default>");
}
}