diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 93baa65e3d1d4..4c60a96746c80 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -805,6 +805,20 @@ conversion: call("TYPEOF", input) call("TYPEOF", input, force_serializable) description: Returns the string representation of the input expression's data type. By default, the returned string is a summary string that might omit certain details for readability. If force_serializable is set to TRUE, the string represents a full data type that could be persisted in a catalog. Note that especially anonymous, inline data types have no serializable string representation. In this case, NULL is returned. + - sql: IS_VALID_UTF8(bytes) + table: BYTES.isValidUtf8() + description: | + Returns `TRUE` if the input is well-formed UTF-8, `FALSE` otherwise. Specifically rejects: truncated multi-byte sequences (missing continuation bytes), "overlong" encodings (using more bytes than necessary for the code point), code points above the Unicode maximum U+10FFFF, and UTF-16 surrogate values U+D800-U+DFFF (which have no UTF-8 representation). Returns `NULL` if the input is `NULL`. + + Useful for filtering records with invalid UTF-8: `WHERE IS_VALID_UTF8(payload)` keeps clean rows; `WHERE NOT IS_VALID_UTF8(payload)` selects the rejects. + + E.g., `IS_VALID_UTF8(x'48656C6C6F')` returns `TRUE`; `IS_VALID_UTF8(x'80')` returns `FALSE`. + - sql: MAKE_VALID_UTF8(bytes) + table: BYTES.makeValidUtf8() + description: | + Decodes the input as UTF-8, replacing each invalid sequence with the Unicode replacement character `U+FFFD` (rendered as `�`). The substitution is lossy and irreversible. Returns `NULL` if the input is `NULL`. + + E.g., `MAKE_VALID_UTF8(x'48656C6C6F')` returns `'Hello'`; `MAKE_VALID_UTF8(x'80')` returns `'�'` (the `U+FFFD` replacement character). collection: - sql: CARDINALITY(array) diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index df74194ff9003..5cfcac4879137 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -935,6 +935,20 @@ conversion: 返回输入表达式的数据类型的字符串表示。默认情况下返回的字符串是一个摘要字符串,可能会为了可读性而省略某些细节。 如果 force_serializable 设置为 TRUE,则字符串表示可以持久化保存在 catalog 中的完整数据类型。 请注意,特别是匿名的内联数据类型没有可序列化的字符串表示。在这种情况下返回 `NULL`。 + - sql: IS_VALID_UTF8(bytes) + table: BYTES.isValidUtf8() + description: | + Returns `TRUE` if the input is well-formed UTF-8, `FALSE` otherwise. Specifically rejects: truncated multi-byte sequences (missing continuation bytes), "overlong" encodings (using more bytes than necessary for the code point), code points above the Unicode maximum U+10FFFF, and UTF-16 surrogate values U+D800-U+DFFF (which have no UTF-8 representation). Returns `NULL` if the input is `NULL`. + + Useful for filtering records with invalid UTF-8: `WHERE IS_VALID_UTF8(payload)` keeps clean rows; `WHERE NOT IS_VALID_UTF8(payload)` selects the rejects. + + E.g., `IS_VALID_UTF8(x'48656C6C6F')` returns `TRUE`; `IS_VALID_UTF8(x'80')` returns `FALSE`. + - sql: MAKE_VALID_UTF8(bytes) + table: BYTES.makeValidUtf8() + description: | + Decodes the input as UTF-8, replacing each invalid sequence with the Unicode replacement character `U+FFFD` (rendered as `�`). The substitution is lossy and irreversible. Returns `NULL` if the input is `NULL`. + + E.g., `MAKE_VALID_UTF8(x'48656C6C6F')` returns `'Hello'`; `MAKE_VALID_UTF8(x'80')` returns `'�'` (the `U+FFFD` replacement character). collection: - sql: CARDINALITY(array) diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py index 9e1711dc4558d..82c49f2edc63d 100644 --- a/flink-python/pyflink/table/expression.py +++ b/flink-python/pyflink/table/expression.py @@ -1489,6 +1489,28 @@ def inet_ntoa(self) -> 'Expression[str]': """ return _unary_op("inetNtoa")(self) + @property + def is_valid_utf8(self) -> 'Expression[bool]': + """ + Returns true if the input bytes are a well-formed UTF-8 sequence, false otherwise. + Returns null if the input is null. + + Specifically rejects: truncated multi-byte sequences (missing continuation bytes), + "overlong" encodings (using more bytes than necessary for the code point), code points + above the Unicode maximum U+10FFFF, and UTF-16 surrogate values U+D800-U+DFFF (which + have no UTF-8 representation). + """ + return _unary_op("isValidUtf8")(self) + + @property + def make_valid_utf8(self) -> 'Expression[str]': + """ + Decodes the input bytes as UTF-8, replacing each invalid sequence with the Unicode + replacement character U+FFFD. The substitution is lossy and irreversible. Returns null + if the input is null. + """ + return _unary_op("makeValidUtf8")(self) + def parse_url(self, part_to_extract: Union[str, 'Expression[str]'], key: Union[str, 'Expression[str]'] = None) -> 'Expression[str]': """ diff --git a/flink-python/pyflink/table/tests/test_expression.py b/flink-python/pyflink/table/tests/test_expression.py index e7a2078d1dbe4..543566c1e8cb8 100644 --- a/flink-python/pyflink/table/tests/test_expression.py +++ b/flink-python/pyflink/table/tests/test_expression.py @@ -188,6 +188,10 @@ def test_expression(self): self.assertEqual("INET_ATON(a)", str(expr1.inet_aton())) self.assertEqual("INET_NTOA(a)", str(expr1.inet_ntoa())) + # utf-8 validation functions + self.assertEqual("IS_VALID_UTF8(a)", str(expr1.is_valid_utf8)) + self.assertEqual("MAKE_VALID_UTF8(a)", str(expr1.make_valid_utf8)) + # regexp functions self.assertEqual("regexp(a, b)", str(expr1.regexp(expr2))) self.assertEqual("REGEXP_COUNT(a, b)", str(expr1.regexp_count(expr2))) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java index 27921d4105df3..074224ae696fd 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java @@ -138,6 +138,7 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_NOT_TRUE; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_NULL; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_TRUE; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_VALID_UTF8; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_EXISTS; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_QUERY; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_QUOTE; @@ -157,6 +158,7 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LOWER; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LPAD; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LTRIM; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAKE_VALID_UTF8; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAP_ENTRIES; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAP_KEYS; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAP_UNION; @@ -1493,6 +1495,28 @@ public OutType inetNtoa() { return toApiSpecificExpression(unresolvedCall(INET_NTOA, toExpr())); } + /** + * Returns {@code true} if the input bytes are a well-formed UTF-8 sequence, {@code false} + * otherwise. Returns {@code null} if the input is {@code null}. + * + *
Specifically rejects: truncated multi-byte sequences (missing continuation bytes),
+ * "overlong" encodings (using more bytes than necessary for the code point), code points above
+ * the Unicode maximum U+10FFFF, and UTF-16 surrogate values U+D800-U+DFFF (which have no UTF-8
+ * representation).
+ */
+ public OutType isValidUtf8() {
+ return toApiSpecificExpression(unresolvedCall(IS_VALID_UTF8, toExpr()));
+ }
+
+ /**
+ * Decodes the input bytes as UTF-8, replacing each invalid sequence with the Unicode
+ * replacement character {@code U+FFFD}. The substitution is lossy and irreversible. Returns
+ * {@code null} if the input is {@code null}.
+ */
+ public OutType makeValidUtf8() {
+ return toApiSpecificExpression(unresolvedCall(MAKE_VALID_UTF8, toExpr()));
+ }
+
/**
* Parse url and return various parameter of the URL. If accept any null arguments, return null.
*/
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index cd25361923c7e..abe54e6f48ebf 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -489,6 +489,26 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
"org.apache.flink.table.runtime.functions.scalar.InetNtoaFunction")
.build();
+ public static final BuiltInFunctionDefinition IS_VALID_UTF8 =
+ BuiltInFunctionDefinition.newBuilder()
+ .name("IS_VALID_UTF8")
+ .kind(SCALAR)
+ .inputTypeStrategy(sequence(logical(LogicalTypeFamily.BINARY_STRING)))
+ .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BOOLEAN())))
+ .runtimeClass(
+ "org.apache.flink.table.runtime.functions.scalar.IsValidUtf8Function")
+ .build();
+
+ public static final BuiltInFunctionDefinition MAKE_VALID_UTF8 =
+ BuiltInFunctionDefinition.newBuilder()
+ .name("MAKE_VALID_UTF8")
+ .kind(SCALAR)
+ .inputTypeStrategy(sequence(logical(LogicalTypeFamily.BINARY_STRING)))
+ .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING())))
+ .runtimeClass(
+ "org.apache.flink.table.runtime.functions.scalar.MakeValidUtf8Function")
+ .build();
+
public static final BuiltInFunctionDefinition INTERNAL_REPLICATE_ROWS =
BuiltInFunctionDefinition.newBuilder()
.name("$REPLICATE_ROWS$1")
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/Utf8FunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/Utf8FunctionsITCase.java
new file mode 100644
index 0000000000000..5454372442169
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/Utf8FunctionsITCase.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+
+import java.nio.charset.StandardCharsets;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/**
+ * Tests for {@link BuiltInFunctionDefinitions#IS_VALID_UTF8} and {@link
+ * BuiltInFunctionDefinitions#MAKE_VALID_UTF8}.
+ */
+public class Utf8FunctionsITCase extends BuiltInFunctionTestBase {
+
+ private static final byte[] HELLO = "Hello".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] MULTIBYTE = "é€😀".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] INVALID_START = {(byte) 0x80};
+ private static final byte[] TRUNCATED = {(byte) 0xE2, (byte) 0x82};
+ private static final byte[] OVERLONG = {(byte) 0xC0, (byte) 0xAF};
+ private static final byte[] SURROGATE = {(byte) 0xED, (byte) 0xA0, (byte) 0x80};
+
+ @Override
+ Stream Decodes UTF-8 bytes leniently, replacing each invalid sequence with the Unicode replacement
+ * character {@code U+FFFD}. The substitution is lossy and irreversible.
+ */
+@Internal
+public final class MakeValidUtf8Function extends BuiltInScalarFunction {
+
+ public MakeValidUtf8Function(SpecializedContext context) {
+ super(BuiltInFunctionDefinitions.MAKE_VALID_UTF8, context);
+ }
+
+ public @Nullable StringData eval(final @Nullable byte[] bytes) {
+ if (bytes == null) {
+ return null;
+ }
+ return StringData.fromString(new String(bytes, StandardCharsets.UTF_8));
+ }
+}