diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 93baa65e3d1d4..4c60a96746c80 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -805,6 +805,20 @@ conversion: call("TYPEOF", input) call("TYPEOF", input, force_serializable) description: Returns the string representation of the input expression's data type. By default, the returned string is a summary string that might omit certain details for readability. If force_serializable is set to TRUE, the string represents a full data type that could be persisted in a catalog. Note that especially anonymous, inline data types have no serializable string representation. In this case, NULL is returned. + - sql: IS_VALID_UTF8(bytes) + table: BYTES.isValidUtf8() + description: | + Returns `TRUE` if the input is well-formed UTF-8, `FALSE` otherwise. Specifically rejects: truncated multi-byte sequences (missing continuation bytes), "overlong" encodings (using more bytes than necessary for the code point), code points above the Unicode maximum U+10FFFF, and UTF-16 surrogate values U+D800-U+DFFF (which have no UTF-8 representation). Returns `NULL` if the input is `NULL`. + + Useful for filtering records with invalid UTF-8: `WHERE IS_VALID_UTF8(payload)` keeps clean rows; `WHERE NOT IS_VALID_UTF8(payload)` selects the rejects. + + E.g., `IS_VALID_UTF8(x'48656C6C6F')` returns `TRUE`; `IS_VALID_UTF8(x'80')` returns `FALSE`. + - sql: MAKE_VALID_UTF8(bytes) + table: BYTES.makeValidUtf8() + description: | + Decodes the input as UTF-8, replacing each invalid sequence with the Unicode replacement character `U+FFFD` (rendered as `�`). The substitution is lossy and irreversible. Returns `NULL` if the input is `NULL`. + + E.g., `MAKE_VALID_UTF8(x'48656C6C6F')` returns `'Hello'`; `MAKE_VALID_UTF8(x'80')` returns `'�'` (the `U+FFFD` replacement character). collection: - sql: CARDINALITY(array) diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index df74194ff9003..5cfcac4879137 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -935,6 +935,20 @@ conversion: 返回输入表达式的数据类型的字符串表示。默认情况下返回的字符串是一个摘要字符串,可能会为了可读性而省略某些细节。 如果 force_serializable 设置为 TRUE,则字符串表示可以持久化保存在 catalog 中的完整数据类型。 请注意,特别是匿名的内联数据类型没有可序列化的字符串表示。在这种情况下返回 `NULL`。 + - sql: IS_VALID_UTF8(bytes) + table: BYTES.isValidUtf8() + description: | + Returns `TRUE` if the input is well-formed UTF-8, `FALSE` otherwise. Specifically rejects: truncated multi-byte sequences (missing continuation bytes), "overlong" encodings (using more bytes than necessary for the code point), code points above the Unicode maximum U+10FFFF, and UTF-16 surrogate values U+D800-U+DFFF (which have no UTF-8 representation). Returns `NULL` if the input is `NULL`. + + Useful for filtering records with invalid UTF-8: `WHERE IS_VALID_UTF8(payload)` keeps clean rows; `WHERE NOT IS_VALID_UTF8(payload)` selects the rejects. + + E.g., `IS_VALID_UTF8(x'48656C6C6F')` returns `TRUE`; `IS_VALID_UTF8(x'80')` returns `FALSE`. + - sql: MAKE_VALID_UTF8(bytes) + table: BYTES.makeValidUtf8() + description: | + Decodes the input as UTF-8, replacing each invalid sequence with the Unicode replacement character `U+FFFD` (rendered as `�`). The substitution is lossy and irreversible. Returns `NULL` if the input is `NULL`. + + E.g., `MAKE_VALID_UTF8(x'48656C6C6F')` returns `'Hello'`; `MAKE_VALID_UTF8(x'80')` returns `'�'` (the `U+FFFD` replacement character). collection: - sql: CARDINALITY(array) diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py index 9e1711dc4558d..82c49f2edc63d 100644 --- a/flink-python/pyflink/table/expression.py +++ b/flink-python/pyflink/table/expression.py @@ -1489,6 +1489,28 @@ def inet_ntoa(self) -> 'Expression[str]': """ return _unary_op("inetNtoa")(self) + @property + def is_valid_utf8(self) -> 'Expression[bool]': + """ + Returns true if the input bytes are a well-formed UTF-8 sequence, false otherwise. + Returns null if the input is null. + + Specifically rejects: truncated multi-byte sequences (missing continuation bytes), + "overlong" encodings (using more bytes than necessary for the code point), code points + above the Unicode maximum U+10FFFF, and UTF-16 surrogate values U+D800-U+DFFF (which + have no UTF-8 representation). + """ + return _unary_op("isValidUtf8")(self) + + @property + def make_valid_utf8(self) -> 'Expression[str]': + """ + Decodes the input bytes as UTF-8, replacing each invalid sequence with the Unicode + replacement character U+FFFD. The substitution is lossy and irreversible. Returns null + if the input is null. + """ + return _unary_op("makeValidUtf8")(self) + def parse_url(self, part_to_extract: Union[str, 'Expression[str]'], key: Union[str, 'Expression[str]'] = None) -> 'Expression[str]': """ diff --git a/flink-python/pyflink/table/tests/test_expression.py b/flink-python/pyflink/table/tests/test_expression.py index e7a2078d1dbe4..543566c1e8cb8 100644 --- a/flink-python/pyflink/table/tests/test_expression.py +++ b/flink-python/pyflink/table/tests/test_expression.py @@ -188,6 +188,10 @@ def test_expression(self): self.assertEqual("INET_ATON(a)", str(expr1.inet_aton())) self.assertEqual("INET_NTOA(a)", str(expr1.inet_ntoa())) + # utf-8 validation functions + self.assertEqual("IS_VALID_UTF8(a)", str(expr1.is_valid_utf8)) + self.assertEqual("MAKE_VALID_UTF8(a)", str(expr1.make_valid_utf8)) + # regexp functions self.assertEqual("regexp(a, b)", str(expr1.regexp(expr2))) self.assertEqual("REGEXP_COUNT(a, b)", str(expr1.regexp_count(expr2))) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java index 27921d4105df3..074224ae696fd 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java @@ -138,6 +138,7 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_NOT_TRUE; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_NULL; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_TRUE; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_VALID_UTF8; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_EXISTS; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_QUERY; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_QUOTE; @@ -157,6 +158,7 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LOWER; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LPAD; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LTRIM; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAKE_VALID_UTF8; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAP_ENTRIES; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAP_KEYS; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAP_UNION; @@ -1493,6 +1495,28 @@ public OutType inetNtoa() { return toApiSpecificExpression(unresolvedCall(INET_NTOA, toExpr())); } + /** + * Returns {@code true} if the input bytes are a well-formed UTF-8 sequence, {@code false} + * otherwise. Returns {@code null} if the input is {@code null}. + * + *

Specifically rejects: truncated multi-byte sequences (missing continuation bytes), + * "overlong" encodings (using more bytes than necessary for the code point), code points above + * the Unicode maximum U+10FFFF, and UTF-16 surrogate values U+D800-U+DFFF (which have no UTF-8 + * representation). + */ + public OutType isValidUtf8() { + return toApiSpecificExpression(unresolvedCall(IS_VALID_UTF8, toExpr())); + } + + /** + * Decodes the input bytes as UTF-8, replacing each invalid sequence with the Unicode + * replacement character {@code U+FFFD}. The substitution is lossy and irreversible. Returns + * {@code null} if the input is {@code null}. + */ + public OutType makeValidUtf8() { + return toApiSpecificExpression(unresolvedCall(MAKE_VALID_UTF8, toExpr())); + } + /** * Parse url and return various parameter of the URL. If accept any null arguments, return null. */ diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index cd25361923c7e..abe54e6f48ebf 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -489,6 +489,26 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "org.apache.flink.table.runtime.functions.scalar.InetNtoaFunction") .build(); + public static final BuiltInFunctionDefinition IS_VALID_UTF8 = + BuiltInFunctionDefinition.newBuilder() + .name("IS_VALID_UTF8") + .kind(SCALAR) + .inputTypeStrategy(sequence(logical(LogicalTypeFamily.BINARY_STRING))) + .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BOOLEAN()))) + .runtimeClass( + "org.apache.flink.table.runtime.functions.scalar.IsValidUtf8Function") + .build(); + + public static final BuiltInFunctionDefinition MAKE_VALID_UTF8 = + BuiltInFunctionDefinition.newBuilder() + .name("MAKE_VALID_UTF8") + .kind(SCALAR) + .inputTypeStrategy(sequence(logical(LogicalTypeFamily.BINARY_STRING))) + .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING()))) + .runtimeClass( + "org.apache.flink.table.runtime.functions.scalar.MakeValidUtf8Function") + .build(); + public static final BuiltInFunctionDefinition INTERNAL_REPLICATE_ROWS = BuiltInFunctionDefinition.newBuilder() .name("$REPLICATE_ROWS$1") diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/Utf8FunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/Utf8FunctionsITCase.java new file mode 100644 index 0000000000000..5454372442169 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/Utf8FunctionsITCase.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; + +import java.nio.charset.StandardCharsets; +import java.util.stream.Stream; + +import static org.apache.flink.table.api.Expressions.$; + +/** + * Tests for {@link BuiltInFunctionDefinitions#IS_VALID_UTF8} and {@link + * BuiltInFunctionDefinitions#MAKE_VALID_UTF8}. + */ +public class Utf8FunctionsITCase extends BuiltInFunctionTestBase { + + private static final byte[] HELLO = "Hello".getBytes(StandardCharsets.UTF_8); + private static final byte[] MULTIBYTE = "é€😀".getBytes(StandardCharsets.UTF_8); + private static final byte[] INVALID_START = {(byte) 0x80}; + private static final byte[] TRUNCATED = {(byte) 0xE2, (byte) 0x82}; + private static final byte[] OVERLONG = {(byte) 0xC0, (byte) 0xAF}; + private static final byte[] SURROGATE = {(byte) 0xED, (byte) 0xA0, (byte) 0x80}; + + @Override + Stream getTestSetSpecs() { + return Stream.of(isValidUtf8Cases(), makeValidUtf8Cases()); + } + + private TestSetSpec isValidUtf8Cases() { + return TestSetSpec.forFunction(BuiltInFunctionDefinitions.IS_VALID_UTF8) + .onFieldsWithData( + null, HELLO, MULTIBYTE, INVALID_START, TRUNCATED, OVERLONG, SURROGATE) + .andDataTypes( + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES()) + .testSqlResult("IS_VALID_UTF8(f0)", null, DataTypes.BOOLEAN().nullable()) + .testSqlResult("IS_VALID_UTF8(f1)", true, DataTypes.BOOLEAN().nullable()) + .testSqlResult("IS_VALID_UTF8(f2)", true, DataTypes.BOOLEAN().nullable()) + .testSqlResult("IS_VALID_UTF8(f3)", false, DataTypes.BOOLEAN().nullable()) + .testSqlResult("IS_VALID_UTF8(f4)", false, DataTypes.BOOLEAN().nullable()) + .testSqlResult("IS_VALID_UTF8(f5)", false, DataTypes.BOOLEAN().nullable()) + .testSqlResult("IS_VALID_UTF8(f6)", false, DataTypes.BOOLEAN().nullable()) + // Table API method routes to the same definition. + .testTableApiResult($("f1").isValidUtf8(), true, DataTypes.BOOLEAN().nullable()) + .testTableApiResult($("f3").isValidUtf8(), false, DataTypes.BOOLEAN().nullable()); + } + + private TestSetSpec makeValidUtf8Cases() { + // Lenient decode equivalent to new String(b, UTF_8). Java follows the W3C + // maximal-subpart rule: one replacement char per maximal ill-formed subpart. + final byte[] mixed = {'A', 'B', (byte) 0x80, 'C', 'D'}; + return TestSetSpec.forFunction(BuiltInFunctionDefinitions.MAKE_VALID_UTF8) + .onFieldsWithData( + null, + HELLO, + MULTIBYTE, + INVALID_START, + TRUNCATED, + OVERLONG, + SURROGATE, + mixed) + .andDataTypes( + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES()) + .testSqlResult("MAKE_VALID_UTF8(f0)", null, DataTypes.STRING().nullable()) + .testSqlResult("MAKE_VALID_UTF8(f1)", "Hello", DataTypes.STRING().nullable()) + .testSqlResult("MAKE_VALID_UTF8(f2)", "é€😀", DataTypes.STRING().nullable()) + .testSqlResult("MAKE_VALID_UTF8(f3)", "�", DataTypes.STRING().nullable()) + .testSqlResult("MAKE_VALID_UTF8(f4)", "�", DataTypes.STRING().nullable()) + .testSqlResult("MAKE_VALID_UTF8(f5)", "��", DataTypes.STRING().nullable()) + // JDK's UTF-8 decoder consumes the entire 3-byte surrogate attempt and emits + // a single U+FFFD; this differs from the W3C maximal-subpart count of 3. + .testSqlResult("MAKE_VALID_UTF8(f6)", "�", DataTypes.STRING().nullable()) + .testSqlResult("MAKE_VALID_UTF8(f7)", "AB�CD", DataTypes.STRING().nullable()) + .testTableApiResult($("f1").makeValidUtf8(), "Hello", DataTypes.STRING().nullable()) + .testTableApiResult($("f3").makeValidUtf8(), "�", DataTypes.STRING().nullable()); + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/IsValidUtf8Function.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/IsValidUtf8Function.java new file mode 100644 index 0000000000000..5d6a4c8476b4e --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/IsValidUtf8Function.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.binary.StringUtf8Utils; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; + +import javax.annotation.Nullable; + +/** Implementation of {@link BuiltInFunctionDefinitions#IS_VALID_UTF8}. */ +@Internal +public final class IsValidUtf8Function extends BuiltInScalarFunction { + + public IsValidUtf8Function(SpecializedContext context) { + super(BuiltInFunctionDefinitions.IS_VALID_UTF8, context); + } + + public @Nullable Boolean eval(final @Nullable byte[] bytes) { + if (bytes == null) { + return null; + } + return StringUtf8Utils.firstInvalidUtf8ByteIndex(bytes, 0, bytes.length) < 0; + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MakeValidUtf8Function.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MakeValidUtf8Function.java new file mode 100644 index 0000000000000..cdf0d6b98a194 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MakeValidUtf8Function.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; + +import javax.annotation.Nullable; + +import java.nio.charset.StandardCharsets; + +/** + * Implementation of {@link BuiltInFunctionDefinitions#MAKE_VALID_UTF8}. + * + *

Decodes UTF-8 bytes leniently, replacing each invalid sequence with the Unicode replacement + * character {@code U+FFFD}. The substitution is lossy and irreversible. + */ +@Internal +public final class MakeValidUtf8Function extends BuiltInScalarFunction { + + public MakeValidUtf8Function(SpecializedContext context) { + super(BuiltInFunctionDefinitions.MAKE_VALID_UTF8, context); + } + + public @Nullable StringData eval(final @Nullable byte[] bytes) { + if (bytes == null) { + return null; + } + return StringData.fromString(new String(bytes, StandardCharsets.UTF_8)); + } +}