From 5e76616e10209645fb617502e6055cb932e31745 Mon Sep 17 00:00:00 2001 From: Gustavo de Morais Date: Mon, 4 May 2026 18:12:01 +0200 Subject: [PATCH 1/4] [FLINK-39602][table] Add IS_VALID_UTF8 and MAKE_VALID_UTF8 built-in functions --- docs/data/sql_functions.yml | 16 ++ docs/data/sql_functions_zh.yml | 16 ++ .../table/api/internal/BaseExpressions.java | 24 +++ .../functions/BuiltInFunctionDefinitions.java | 20 +++ .../flink/table/utils/EncodingUtils.java | 155 ++++++++++++++++++ .../flink/table/utils/EncodingUtilsTest.java | 76 +++++++++ .../functions/Utf8FunctionsITCase.java | 107 ++++++++++++ .../functions/scalar/IsValidUtf8Function.java | 42 +++++ .../scalar/MakeValidUtf8Function.java | 49 ++++++ 9 files changed, 505 insertions(+) create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/Utf8FunctionsITCase.java create mode 100644 flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/IsValidUtf8Function.java create mode 100644 flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MakeValidUtf8Function.java diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 93baa65e3d1d4..b56bd1391ce68 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -805,6 +805,22 @@ conversion: call("TYPEOF", input) call("TYPEOF", input, force_serializable) description: Returns the string representation of the input expression's data type. By default, the returned string is a summary string that might omit certain details for readability. If force_serializable is set to TRUE, the string represents a full data type that could be persisted in a catalog. Note that especially anonymous, inline data types have no serializable string representation. In this case, NULL is returned. + - sql: IS_VALID_UTF8(bytes) + table: BYTES.isValidUtf8() + description: | + Returns `TRUE` if the input is well-formed UTF-8, `FALSE` otherwise. Specifically rejects: truncated multi-byte sequences (missing continuation bytes), "overlong" encodings (using more bytes than necessary for the code point), code points above the Unicode maximum U+10FFFF, and UTF-16 surrogate values U+D800-U+DFFF (which have no UTF-8 representation). Returns `NULL` if the input is `NULL`. + + Useful for routing records with invalid UTF-8 to a dead-letter sink: `WHERE IS_VALID_UTF8(payload)` keeps clean rows; `WHERE NOT IS_VALID_UTF8(payload)` selects the rejects. + + E.g., `IS_VALID_UTF8(x'48656C6C6F')` returns `TRUE`; `IS_VALID_UTF8(x'80')` returns `FALSE`. + - sql: MAKE_VALID_UTF8(bytes) + table: BYTES.makeValidUtf8() + description: | + Decodes the input as UTF-8, replacing each invalid sequence with the Unicode replacement character `U+FFFD` (rendered as `�`). The substitution is lossy and irreversible. Returns `NULL` if the input is `NULL`. + + If you want to explicitly have the behavior of silently substituting invalid bytes with `U+FFFD` when doing a `CAST(bytes AS STRING)`, replace the cast with `MAKE_VALID_UTF8(bytes)`. + + E.g., `MAKE_VALID_UTF8(x'48656C6C6F')` returns `'Hello'`; `MAKE_VALID_UTF8(x'80')` returns `'�'` (the `U+FFFD` replacement character). collection: - sql: CARDINALITY(array) diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index df74194ff9003..b3aadbca12831 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -935,6 +935,22 @@ conversion: 返回输入表达式的数据类型的字符串表示。默认情况下返回的字符串是一个摘要字符串,可能会为了可读性而省略某些细节。 如果 force_serializable 设置为 TRUE,则字符串表示可以持久化保存在 catalog 中的完整数据类型。 请注意,特别是匿名的内联数据类型没有可序列化的字符串表示。在这种情况下返回 `NULL`。 + - sql: IS_VALID_UTF8(bytes) + table: BYTES.isValidUtf8() + description: | + Returns `TRUE` if the input is well-formed UTF-8, `FALSE` otherwise. Specifically rejects: truncated multi-byte sequences (missing continuation bytes), "overlong" encodings (using more bytes than necessary for the code point), code points above the Unicode maximum U+10FFFF, and UTF-16 surrogate values U+D800-U+DFFF (which have no UTF-8 representation). Returns `NULL` if the input is `NULL`. + + Useful for routing records with invalid UTF-8 to a dead-letter sink: `WHERE IS_VALID_UTF8(payload)` keeps clean rows; `WHERE NOT IS_VALID_UTF8(payload)` selects the rejects. + + E.g., `IS_VALID_UTF8(x'48656C6C6F')` returns `TRUE`; `IS_VALID_UTF8(x'80')` returns `FALSE`. + - sql: MAKE_VALID_UTF8(bytes) + table: BYTES.makeValidUtf8() + description: | + Decodes the input as UTF-8, replacing each invalid sequence with the Unicode replacement character `U+FFFD` (rendered as `�`). The substitution is lossy and irreversible. Returns `NULL` if the input is `NULL`. + + If you want to explicitly have the behavior of silently substituting invalid bytes with `U+FFFD` when doing a `CAST(bytes AS STRING)`, replace the cast with `MAKE_VALID_UTF8(bytes)`. + + E.g., `MAKE_VALID_UTF8(x'48656C6C6F')` returns `'Hello'`; `MAKE_VALID_UTF8(x'80')` returns `'�'` (the `U+FFFD` replacement character). collection: - sql: CARDINALITY(array) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java index 27921d4105df3..20ff2badeea87 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java @@ -138,6 +138,7 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_NOT_TRUE; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_NULL; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_TRUE; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_VALID_UTF8; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_EXISTS; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_QUERY; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_QUOTE; @@ -157,6 +158,7 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LOWER; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LPAD; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LTRIM; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAKE_VALID_UTF8; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAP_ENTRIES; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAP_KEYS; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAP_UNION; @@ -1493,6 +1495,28 @@ public OutType inetNtoa() { return toApiSpecificExpression(unresolvedCall(INET_NTOA, toExpr())); } + /** + * Returns {@code true} if the input bytes form a well-formed UTF-8 sequence, {@code false} + * otherwise. Returns {@code null} if the input is {@code null}. + * + *

Specifically rejects: truncated multi-byte sequences (missing continuation bytes), + * "overlong" encodings (using more bytes than necessary for the code point), code points above + * the Unicode maximum U+10FFFF, and UTF-16 surrogate values U+D800-U+DFFF (which have no UTF-8 + * representation). + */ + public OutType isValidUtf8() { + return toApiSpecificExpression(unresolvedCall(IS_VALID_UTF8, toExpr())); + } + + /** + * Decodes the input bytes as UTF-8, replacing each invalid sequence with the Unicode + * replacement character {@code U+FFFD}. The substitution is lossy and irreversible. Returns + * {@code null} if the input is {@code null}. + */ + public OutType makeValidUtf8() { + return toApiSpecificExpression(unresolvedCall(MAKE_VALID_UTF8, toExpr())); + } + /** * Parse url and return various parameter of the URL. If accept any null arguments, return null. */ diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index cd25361923c7e..abe54e6f48ebf 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -489,6 +489,26 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "org.apache.flink.table.runtime.functions.scalar.InetNtoaFunction") .build(); + public static final BuiltInFunctionDefinition IS_VALID_UTF8 = + BuiltInFunctionDefinition.newBuilder() + .name("IS_VALID_UTF8") + .kind(SCALAR) + .inputTypeStrategy(sequence(logical(LogicalTypeFamily.BINARY_STRING))) + .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BOOLEAN()))) + .runtimeClass( + "org.apache.flink.table.runtime.functions.scalar.IsValidUtf8Function") + .build(); + + public static final BuiltInFunctionDefinition MAKE_VALID_UTF8 = + BuiltInFunctionDefinition.newBuilder() + .name("MAKE_VALID_UTF8") + .kind(SCALAR) + .inputTypeStrategy(sequence(logical(LogicalTypeFamily.BINARY_STRING))) + .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING()))) + .runtimeClass( + "org.apache.flink.table.runtime.functions.scalar.MakeValidUtf8Function") + .build(); + public static final BuiltInFunctionDefinition INTERNAL_REPLICATE_ROWS = BuiltInFunctionDefinition.newBuilder() .name("$REPLICATE_ROWS$1") diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java index b131a3c0617e2..4eed6c24252cf 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java @@ -22,6 +22,7 @@ import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.ValidationException; import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; import java.io.IOException; import java.io.Serializable; @@ -187,6 +188,160 @@ public static String hex(byte[] bytes) { return new String(hexChars); } + // -------------------------------------------------------------------------------------------- + // UTF-8 validation + // + // The validator below is intentionally self-contained so this PR does not depend on + // FLINK-39601. Once that ticket lands, isValidUtf8 should delegate to + // StringUtf8Utils.firstInvalidUtf8ByteIndex and the private helpers below can be removed. + // -------------------------------------------------------------------------------------------- + + /** + * Returns {@code true} if the given bytes form a well-formed UTF-8 sequence. Returns {@code + * false} for {@code null} input (null is not a valid byte sequence). + * + *

Specifically rejects: truncated multi-byte sequences (missing continuation bytes), + * "overlong" encodings (using more bytes than necessary for the code point), code points above + * the Unicode maximum U+10FFFF, and UTF-16 surrogate values U+D800-U+DFFF (which have no UTF-8 + * representation). + */ + public static boolean isValidUtf8(final byte[] bytes) { + return bytes != null && firstInvalidUtf8ByteIndex(bytes, 0, bytes.length) < 0; + } + + /** + * Returns {@code true} if {@code numBytes} starting at {@code offset} form a well-formed UTF-8 + * sequence. Returns {@code false} for {@code null} input. See {@link #isValidUtf8(byte[])} for + * what counts as well-formed. + * + * @throws IllegalArgumentException if the offset/length range is out-of-bounds + */ + public static boolean isValidUtf8(final byte[] bytes, final int offset, final int numBytes) { + return bytes != null && firstInvalidUtf8ByteIndex(bytes, offset, numBytes) < 0; + } + + // Bit-pattern predicates for UTF-8 byte categorization. The JIT inlines these so they cost + // nothing at runtime, but they make the validator below read like prose. + private static boolean isAsciiByte(int b) { + return b >= 0; + } + + private static boolean is2ByteLead(int b) { + // 110xxxxx; (b & 0x1e) != 0 rejects the overlong leads 0xC0 and 0xC1 + return (b >> 5) == -2 && (b & 0x1e) != 0; + } + + private static boolean is3ByteLead(int b) { + return (b >> 4) == -2; // 1110xxxx + } + + private static boolean is4ByteLead(int b) { + return (b >> 3) == -2; // 11110xxx + } + + private static boolean isContinuation(int b) { + return (b & 0xc0) == 0x80; // 10xxxxxx + } + + private static boolean isOverlong3(int b1, int b2) { + // 0xE0 followed by 0x80-0x9F encodes a code point already representable in 2 bytes + return b1 == (byte) 0xe0 && (b2 & 0xe0) == 0x80; + } + + private static char decode3ByteSequence(int b1, int b2, int b3) { + return (char) + ((b1 << 12) + ^ (b2 << 6) + ^ (b3 ^ (((byte) 0xE0 << 12) ^ ((byte) 0x80 << 6) ^ ((byte) 0x80)))); + } + + private static int decode4ByteSequence(int b1, int b2, int b3, int b4) { + return (b1 << 18) + ^ (b2 << 12) + ^ (b3 << 6) + ^ (b4 + ^ (((byte) 0xF0 << 18) + ^ ((byte) 0x80 << 12) + ^ ((byte) 0x80 << 6) + ^ ((byte) 0x80))); + } + + private static int firstInvalidUtf8ByteIndex( + final byte[] bytes, final int offset, final int numBytes) { + Preconditions.checkArgument(offset >= 0, "offset must be >= 0, was %s", offset); + Preconditions.checkArgument(numBytes >= 0, "numBytes must be >= 0, was %s", numBytes); + Preconditions.checkArgument( + offset <= bytes.length - numBytes, + "offset (%s) + numBytes (%s) exceeds array length (%s)", + offset, + numBytes, + bytes.length); + + int sp = offset; + final int sl = sp + numBytes; + + // ASCII fast-path + while (sp < sl && isAsciiByte(bytes[sp])) { + sp++; + } + + while (sp < sl) { + final int start = sp; + final int b1 = bytes[sp++]; + + if (isAsciiByte(b1)) { + continue; + } + + if (is2ByteLead(b1)) { + if (sp >= sl) { + return sl; + } + if (!isContinuation(bytes[sp++])) { + return start; + } + continue; + } + + if (is3ByteLead(b1)) { + if (sp + 1 >= sl) { + return sl; + } + final int b2 = bytes[sp++]; + final int b3 = bytes[sp++]; + if (isOverlong3(b1, b2) || !isContinuation(b2) || !isContinuation(b3)) { + return start; + } + if (Character.isSurrogate(decode3ByteSequence(b1, b2, b3))) { + return start; + } + continue; + } + + if (is4ByteLead(b1)) { + if (sp + 2 >= sl) { + return sl; + } + final int b2 = bytes[sp++]; + final int b3 = bytes[sp++]; + final int b4 = bytes[sp++]; + if (!isContinuation(b2) || !isContinuation(b3) || !isContinuation(b4)) { + return start; + } + // Shortest-form check catches both overlong 4-byte forms and code points + // above U+10FFFF (anything not in the supplementary plane is invalid here). + if (!Character.isSupplementaryCodePoint(decode4ByteSequence(b1, b2, b3, b4))) { + return start; + } + continue; + } + + // Continuation byte without a lead, or a 5+-byte lead (RFC 3629 forbids). + return start; + } + return -1; + } + /** * Converts an array of characters representing hexadecimal values into an array of bytes of * those same values. E.g. {@code unhex("12".getBytes())} returns {@code new byte[]{0x12}}. diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/EncodingUtilsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/EncodingUtilsTest.java index 4c5d11dc4d6e6..8d7b0b797a6f3 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/EncodingUtilsTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/EncodingUtilsTest.java @@ -21,9 +21,11 @@ import org.junit.jupiter.api.Test; import java.io.Serializable; +import java.nio.charset.StandardCharsets; import java.util.Objects; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link org.apache.flink.table.utils.EncodingUtils}. */ class EncodingUtilsTest { @@ -61,6 +63,80 @@ void testRepetition() { assertThat(EncodingUtils.repeat("we", 3)).isEqualTo("wewewe"); } + @Test + void testIsValidUtf8() { + // ASCII fast-path + assertThat(EncodingUtils.isValidUtf8(new byte[0])).isTrue(); + assertThat(EncodingUtils.isValidUtf8(new byte[] {0x00})).isTrue(); + assertThat(EncodingUtils.isValidUtf8("Hello, world!".getBytes(StandardCharsets.UTF_8))) + .isTrue(); + + // Multi-byte sequences: 2-byte (é), 3-byte (€), 4-byte (😀) + assertThat(EncodingUtils.isValidUtf8("é€😀".getBytes(StandardCharsets.UTF_8))).isTrue(); + + // Code-point boundaries at each width (smallest + largest) + assertThat(EncodingUtils.isValidUtf8(new byte[] {(byte) 0xC2, (byte) 0x80})) + .isTrue(); // U+0080 + assertThat(EncodingUtils.isValidUtf8(new byte[] {(byte) 0xDF, (byte) 0xBF})) + .isTrue(); // U+07FF + assertThat(EncodingUtils.isValidUtf8(new byte[] {(byte) 0xE0, (byte) 0xA0, (byte) 0x80})) + .isTrue(); // U+0800 + assertThat(EncodingUtils.isValidUtf8(new byte[] {(byte) 0xEF, (byte) 0xBF, (byte) 0xBF})) + .isTrue(); // U+FFFF + assertThat( + EncodingUtils.isValidUtf8( + new byte[] {(byte) 0xF0, (byte) 0x90, (byte) 0x80, (byte) 0x80})) + .isTrue(); // U+10000 + assertThat( + EncodingUtils.isValidUtf8( + new byte[] {(byte) 0xF4, (byte) 0x8F, (byte) 0xBF, (byte) 0xBF})) + .isTrue(); // U+10FFFF (largest valid) + + // Above U+10FFFF and forbidden lead bytes (RFC 3629) + assertThat( + EncodingUtils.isValidUtf8( + new byte[] {(byte) 0xF4, (byte) 0x90, (byte) 0x80, (byte) 0x80})) + .isFalse(); // U+110000 + + // Stray continuation byte without a lead + assertThat(EncodingUtils.isValidUtf8(new byte[] {(byte) 0x80})).isFalse(); + + // Truncated multi-byte sequences + assertThat(EncodingUtils.isValidUtf8(new byte[] {(byte) 0xE2, (byte) 0x82})).isFalse(); + assertThat(EncodingUtils.isValidUtf8(new byte[] {(byte) 0xF0})).isFalse(); + + // Overlong forms of '/' at every width + assertThat(EncodingUtils.isValidUtf8(new byte[] {(byte) 0xC0, (byte) 0xAF})).isFalse(); + assertThat(EncodingUtils.isValidUtf8(new byte[] {(byte) 0xE0, (byte) 0x80, (byte) 0xAF})) + .isFalse(); + assertThat( + EncodingUtils.isValidUtf8( + new byte[] {(byte) 0xF0, (byte) 0x80, (byte) 0x80, (byte) 0xAF})) + .isFalse(); + + // UTF-16 surrogates (high + low, smallest + largest) + assertThat(EncodingUtils.isValidUtf8(new byte[] {(byte) 0xED, (byte) 0xA0, (byte) 0x80})) + .isFalse(); // U+D800 + assertThat(EncodingUtils.isValidUtf8(new byte[] {(byte) 0xED, (byte) 0xBF, (byte) 0xBF})) + .isFalse(); // U+DFFF + + // Offset/length variant: only the inner range is validated + final byte[] padded = {(byte) 0x80, 'O', 'K', (byte) 0x80}; + assertThat(EncodingUtils.isValidUtf8(padded, 1, 2)).isTrue(); + assertThat(EncodingUtils.isValidUtf8(padded, 0, 4)).isFalse(); + assertThat(EncodingUtils.isValidUtf8(padded, padded.length, 0)).isTrue(); + + // null input -> false (Guava Strings.isNullOrEmpty convention) + assertThat(EncodingUtils.isValidUtf8((byte[]) null)).isFalse(); + assertThat(EncodingUtils.isValidUtf8(null, 0, 0)).isFalse(); + + // Bad bounds -> IllegalArgumentException + assertThatThrownBy(() -> EncodingUtils.isValidUtf8(new byte[3], 0, -1)) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> EncodingUtils.isValidUtf8(new byte[3], 1, 3)) + .isInstanceOf(IllegalArgumentException.class); + } + @Test void testUnhex() { assertThat(EncodingUtils.unhex("".getBytes())).isEqualTo(new byte[0]); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/Utf8FunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/Utf8FunctionsITCase.java new file mode 100644 index 0000000000000..5454372442169 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/Utf8FunctionsITCase.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; + +import java.nio.charset.StandardCharsets; +import java.util.stream.Stream; + +import static org.apache.flink.table.api.Expressions.$; + +/** + * Tests for {@link BuiltInFunctionDefinitions#IS_VALID_UTF8} and {@link + * BuiltInFunctionDefinitions#MAKE_VALID_UTF8}. + */ +public class Utf8FunctionsITCase extends BuiltInFunctionTestBase { + + private static final byte[] HELLO = "Hello".getBytes(StandardCharsets.UTF_8); + private static final byte[] MULTIBYTE = "é€😀".getBytes(StandardCharsets.UTF_8); + private static final byte[] INVALID_START = {(byte) 0x80}; + private static final byte[] TRUNCATED = {(byte) 0xE2, (byte) 0x82}; + private static final byte[] OVERLONG = {(byte) 0xC0, (byte) 0xAF}; + private static final byte[] SURROGATE = {(byte) 0xED, (byte) 0xA0, (byte) 0x80}; + + @Override + Stream getTestSetSpecs() { + return Stream.of(isValidUtf8Cases(), makeValidUtf8Cases()); + } + + private TestSetSpec isValidUtf8Cases() { + return TestSetSpec.forFunction(BuiltInFunctionDefinitions.IS_VALID_UTF8) + .onFieldsWithData( + null, HELLO, MULTIBYTE, INVALID_START, TRUNCATED, OVERLONG, SURROGATE) + .andDataTypes( + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES()) + .testSqlResult("IS_VALID_UTF8(f0)", null, DataTypes.BOOLEAN().nullable()) + .testSqlResult("IS_VALID_UTF8(f1)", true, DataTypes.BOOLEAN().nullable()) + .testSqlResult("IS_VALID_UTF8(f2)", true, DataTypes.BOOLEAN().nullable()) + .testSqlResult("IS_VALID_UTF8(f3)", false, DataTypes.BOOLEAN().nullable()) + .testSqlResult("IS_VALID_UTF8(f4)", false, DataTypes.BOOLEAN().nullable()) + .testSqlResult("IS_VALID_UTF8(f5)", false, DataTypes.BOOLEAN().nullable()) + .testSqlResult("IS_VALID_UTF8(f6)", false, DataTypes.BOOLEAN().nullable()) + // Table API method routes to the same definition. + .testTableApiResult($("f1").isValidUtf8(), true, DataTypes.BOOLEAN().nullable()) + .testTableApiResult($("f3").isValidUtf8(), false, DataTypes.BOOLEAN().nullable()); + } + + private TestSetSpec makeValidUtf8Cases() { + // Lenient decode equivalent to new String(b, UTF_8). Java follows the W3C + // maximal-subpart rule: one replacement char per maximal ill-formed subpart. + final byte[] mixed = {'A', 'B', (byte) 0x80, 'C', 'D'}; + return TestSetSpec.forFunction(BuiltInFunctionDefinitions.MAKE_VALID_UTF8) + .onFieldsWithData( + null, + HELLO, + MULTIBYTE, + INVALID_START, + TRUNCATED, + OVERLONG, + SURROGATE, + mixed) + .andDataTypes( + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES()) + .testSqlResult("MAKE_VALID_UTF8(f0)", null, DataTypes.STRING().nullable()) + .testSqlResult("MAKE_VALID_UTF8(f1)", "Hello", DataTypes.STRING().nullable()) + .testSqlResult("MAKE_VALID_UTF8(f2)", "é€😀", DataTypes.STRING().nullable()) + .testSqlResult("MAKE_VALID_UTF8(f3)", "�", DataTypes.STRING().nullable()) + .testSqlResult("MAKE_VALID_UTF8(f4)", "�", DataTypes.STRING().nullable()) + .testSqlResult("MAKE_VALID_UTF8(f5)", "��", DataTypes.STRING().nullable()) + // JDK's UTF-8 decoder consumes the entire 3-byte surrogate attempt and emits + // a single U+FFFD; this differs from the W3C maximal-subpart count of 3. + .testSqlResult("MAKE_VALID_UTF8(f6)", "�", DataTypes.STRING().nullable()) + .testSqlResult("MAKE_VALID_UTF8(f7)", "AB�CD", DataTypes.STRING().nullable()) + .testTableApiResult($("f1").makeValidUtf8(), "Hello", DataTypes.STRING().nullable()) + .testTableApiResult($("f3").makeValidUtf8(), "�", DataTypes.STRING().nullable()); + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/IsValidUtf8Function.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/IsValidUtf8Function.java new file mode 100644 index 0000000000000..36b4af998aff2 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/IsValidUtf8Function.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; +import org.apache.flink.table.utils.EncodingUtils; + +import javax.annotation.Nullable; + +/** Implementation of {@link BuiltInFunctionDefinitions#IS_VALID_UTF8}. */ +@Internal +public final class IsValidUtf8Function extends BuiltInScalarFunction { + + public IsValidUtf8Function(SpecializedContext context) { + super(BuiltInFunctionDefinitions.IS_VALID_UTF8, context); + } + + public @Nullable Boolean eval(final @Nullable byte[] bytes) { + if (bytes == null) { + return null; + } + return EncodingUtils.isValidUtf8(bytes); + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MakeValidUtf8Function.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MakeValidUtf8Function.java new file mode 100644 index 0000000000000..cdf0d6b98a194 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MakeValidUtf8Function.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; + +import javax.annotation.Nullable; + +import java.nio.charset.StandardCharsets; + +/** + * Implementation of {@link BuiltInFunctionDefinitions#MAKE_VALID_UTF8}. + * + *

Decodes UTF-8 bytes leniently, replacing each invalid sequence with the Unicode replacement + * character {@code U+FFFD}. The substitution is lossy and irreversible. + */ +@Internal +public final class MakeValidUtf8Function extends BuiltInScalarFunction { + + public MakeValidUtf8Function(SpecializedContext context) { + super(BuiltInFunctionDefinitions.MAKE_VALID_UTF8, context); + } + + public @Nullable StringData eval(final @Nullable byte[] bytes) { + if (bytes == null) { + return null; + } + return StringData.fromString(new String(bytes, StandardCharsets.UTF_8)); + } +} From 34ace5d98d42293d888e3a848829a1ec2ad543fc Mon Sep 17 00:00:00 2001 From: Gustavo de Morais Date: Tue, 5 May 2026 14:35:39 +0200 Subject: [PATCH 2/4] [FLINK-39602][table] Address review: docs wording --- docs/data/sql_functions.yml | 4 +--- docs/data/sql_functions_zh.yml | 4 +--- .../org/apache/flink/table/api/internal/BaseExpressions.java | 2 +- .../main/java/org/apache/flink/table/utils/EncodingUtils.java | 2 +- 4 files changed, 4 insertions(+), 8 deletions(-) diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index b56bd1391ce68..4c60a96746c80 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -810,7 +810,7 @@ conversion: description: | Returns `TRUE` if the input is well-formed UTF-8, `FALSE` otherwise. Specifically rejects: truncated multi-byte sequences (missing continuation bytes), "overlong" encodings (using more bytes than necessary for the code point), code points above the Unicode maximum U+10FFFF, and UTF-16 surrogate values U+D800-U+DFFF (which have no UTF-8 representation). Returns `NULL` if the input is `NULL`. - Useful for routing records with invalid UTF-8 to a dead-letter sink: `WHERE IS_VALID_UTF8(payload)` keeps clean rows; `WHERE NOT IS_VALID_UTF8(payload)` selects the rejects. + Useful for filtering records with invalid UTF-8: `WHERE IS_VALID_UTF8(payload)` keeps clean rows; `WHERE NOT IS_VALID_UTF8(payload)` selects the rejects. E.g., `IS_VALID_UTF8(x'48656C6C6F')` returns `TRUE`; `IS_VALID_UTF8(x'80')` returns `FALSE`. - sql: MAKE_VALID_UTF8(bytes) @@ -818,8 +818,6 @@ conversion: description: | Decodes the input as UTF-8, replacing each invalid sequence with the Unicode replacement character `U+FFFD` (rendered as `�`). The substitution is lossy and irreversible. Returns `NULL` if the input is `NULL`. - If you want to explicitly have the behavior of silently substituting invalid bytes with `U+FFFD` when doing a `CAST(bytes AS STRING)`, replace the cast with `MAKE_VALID_UTF8(bytes)`. - E.g., `MAKE_VALID_UTF8(x'48656C6C6F')` returns `'Hello'`; `MAKE_VALID_UTF8(x'80')` returns `'�'` (the `U+FFFD` replacement character). collection: diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index b3aadbca12831..5cfcac4879137 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -940,7 +940,7 @@ conversion: description: | Returns `TRUE` if the input is well-formed UTF-8, `FALSE` otherwise. Specifically rejects: truncated multi-byte sequences (missing continuation bytes), "overlong" encodings (using more bytes than necessary for the code point), code points above the Unicode maximum U+10FFFF, and UTF-16 surrogate values U+D800-U+DFFF (which have no UTF-8 representation). Returns `NULL` if the input is `NULL`. - Useful for routing records with invalid UTF-8 to a dead-letter sink: `WHERE IS_VALID_UTF8(payload)` keeps clean rows; `WHERE NOT IS_VALID_UTF8(payload)` selects the rejects. + Useful for filtering records with invalid UTF-8: `WHERE IS_VALID_UTF8(payload)` keeps clean rows; `WHERE NOT IS_VALID_UTF8(payload)` selects the rejects. E.g., `IS_VALID_UTF8(x'48656C6C6F')` returns `TRUE`; `IS_VALID_UTF8(x'80')` returns `FALSE`. - sql: MAKE_VALID_UTF8(bytes) @@ -948,8 +948,6 @@ conversion: description: | Decodes the input as UTF-8, replacing each invalid sequence with the Unicode replacement character `U+FFFD` (rendered as `�`). The substitution is lossy and irreversible. Returns `NULL` if the input is `NULL`. - If you want to explicitly have the behavior of silently substituting invalid bytes with `U+FFFD` when doing a `CAST(bytes AS STRING)`, replace the cast with `MAKE_VALID_UTF8(bytes)`. - E.g., `MAKE_VALID_UTF8(x'48656C6C6F')` returns `'Hello'`; `MAKE_VALID_UTF8(x'80')` returns `'�'` (the `U+FFFD` replacement character). collection: diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java index 20ff2badeea87..074224ae696fd 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java @@ -1496,7 +1496,7 @@ public OutType inetNtoa() { } /** - * Returns {@code true} if the input bytes form a well-formed UTF-8 sequence, {@code false} + * Returns {@code true} if the input bytes are a well-formed UTF-8 sequence, {@code false} * otherwise. Returns {@code null} if the input is {@code null}. * *

Specifically rejects: truncated multi-byte sequences (missing continuation bytes), diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java index 4eed6c24252cf..b4472b8e70c1b 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java @@ -197,7 +197,7 @@ public static String hex(byte[] bytes) { // -------------------------------------------------------------------------------------------- /** - * Returns {@code true} if the given bytes form a well-formed UTF-8 sequence. Returns {@code + * Returns {@code true} if the given bytes are a well-formed UTF-8 sequence. Returns {@code * false} for {@code null} input (null is not a valid byte sequence). * *

Specifically rejects: truncated multi-byte sequences (missing continuation bytes), From b03222d444c962b394907dd751a8aec21391213e Mon Sep 17 00:00:00 2001 From: Gustavo de Morais Date: Wed, 6 May 2026 11:31:04 +0200 Subject: [PATCH 3/4] [FLINK-39602][table] Delegate IS_VALID_UTF8 to StringUtf8Utils after FLINK-39601 merge --- .../flink/table/utils/EncodingUtils.java | 155 ------------------ .../flink/table/utils/EncodingUtilsTest.java | 76 --------- .../functions/scalar/IsValidUtf8Function.java | 4 +- 3 files changed, 2 insertions(+), 233 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java index b4472b8e70c1b..b131a3c0617e2 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java @@ -22,7 +22,6 @@ import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.ValidationException; import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.Preconditions; import java.io.IOException; import java.io.Serializable; @@ -188,160 +187,6 @@ public static String hex(byte[] bytes) { return new String(hexChars); } - // -------------------------------------------------------------------------------------------- - // UTF-8 validation - // - // The validator below is intentionally self-contained so this PR does not depend on - // FLINK-39601. Once that ticket lands, isValidUtf8 should delegate to - // StringUtf8Utils.firstInvalidUtf8ByteIndex and the private helpers below can be removed. - // -------------------------------------------------------------------------------------------- - - /** - * Returns {@code true} if the given bytes are a well-formed UTF-8 sequence. Returns {@code - * false} for {@code null} input (null is not a valid byte sequence). - * - *

Specifically rejects: truncated multi-byte sequences (missing continuation bytes), - * "overlong" encodings (using more bytes than necessary for the code point), code points above - * the Unicode maximum U+10FFFF, and UTF-16 surrogate values U+D800-U+DFFF (which have no UTF-8 - * representation). - */ - public static boolean isValidUtf8(final byte[] bytes) { - return bytes != null && firstInvalidUtf8ByteIndex(bytes, 0, bytes.length) < 0; - } - - /** - * Returns {@code true} if {@code numBytes} starting at {@code offset} form a well-formed UTF-8 - * sequence. Returns {@code false} for {@code null} input. See {@link #isValidUtf8(byte[])} for - * what counts as well-formed. - * - * @throws IllegalArgumentException if the offset/length range is out-of-bounds - */ - public static boolean isValidUtf8(final byte[] bytes, final int offset, final int numBytes) { - return bytes != null && firstInvalidUtf8ByteIndex(bytes, offset, numBytes) < 0; - } - - // Bit-pattern predicates for UTF-8 byte categorization. The JIT inlines these so they cost - // nothing at runtime, but they make the validator below read like prose. - private static boolean isAsciiByte(int b) { - return b >= 0; - } - - private static boolean is2ByteLead(int b) { - // 110xxxxx; (b & 0x1e) != 0 rejects the overlong leads 0xC0 and 0xC1 - return (b >> 5) == -2 && (b & 0x1e) != 0; - } - - private static boolean is3ByteLead(int b) { - return (b >> 4) == -2; // 1110xxxx - } - - private static boolean is4ByteLead(int b) { - return (b >> 3) == -2; // 11110xxx - } - - private static boolean isContinuation(int b) { - return (b & 0xc0) == 0x80; // 10xxxxxx - } - - private static boolean isOverlong3(int b1, int b2) { - // 0xE0 followed by 0x80-0x9F encodes a code point already representable in 2 bytes - return b1 == (byte) 0xe0 && (b2 & 0xe0) == 0x80; - } - - private static char decode3ByteSequence(int b1, int b2, int b3) { - return (char) - ((b1 << 12) - ^ (b2 << 6) - ^ (b3 ^ (((byte) 0xE0 << 12) ^ ((byte) 0x80 << 6) ^ ((byte) 0x80)))); - } - - private static int decode4ByteSequence(int b1, int b2, int b3, int b4) { - return (b1 << 18) - ^ (b2 << 12) - ^ (b3 << 6) - ^ (b4 - ^ (((byte) 0xF0 << 18) - ^ ((byte) 0x80 << 12) - ^ ((byte) 0x80 << 6) - ^ ((byte) 0x80))); - } - - private static int firstInvalidUtf8ByteIndex( - final byte[] bytes, final int offset, final int numBytes) { - Preconditions.checkArgument(offset >= 0, "offset must be >= 0, was %s", offset); - Preconditions.checkArgument(numBytes >= 0, "numBytes must be >= 0, was %s", numBytes); - Preconditions.checkArgument( - offset <= bytes.length - numBytes, - "offset (%s) + numBytes (%s) exceeds array length (%s)", - offset, - numBytes, - bytes.length); - - int sp = offset; - final int sl = sp + numBytes; - - // ASCII fast-path - while (sp < sl && isAsciiByte(bytes[sp])) { - sp++; - } - - while (sp < sl) { - final int start = sp; - final int b1 = bytes[sp++]; - - if (isAsciiByte(b1)) { - continue; - } - - if (is2ByteLead(b1)) { - if (sp >= sl) { - return sl; - } - if (!isContinuation(bytes[sp++])) { - return start; - } - continue; - } - - if (is3ByteLead(b1)) { - if (sp + 1 >= sl) { - return sl; - } - final int b2 = bytes[sp++]; - final int b3 = bytes[sp++]; - if (isOverlong3(b1, b2) || !isContinuation(b2) || !isContinuation(b3)) { - return start; - } - if (Character.isSurrogate(decode3ByteSequence(b1, b2, b3))) { - return start; - } - continue; - } - - if (is4ByteLead(b1)) { - if (sp + 2 >= sl) { - return sl; - } - final int b2 = bytes[sp++]; - final int b3 = bytes[sp++]; - final int b4 = bytes[sp++]; - if (!isContinuation(b2) || !isContinuation(b3) || !isContinuation(b4)) { - return start; - } - // Shortest-form check catches both overlong 4-byte forms and code points - // above U+10FFFF (anything not in the supplementary plane is invalid here). - if (!Character.isSupplementaryCodePoint(decode4ByteSequence(b1, b2, b3, b4))) { - return start; - } - continue; - } - - // Continuation byte without a lead, or a 5+-byte lead (RFC 3629 forbids). - return start; - } - return -1; - } - /** * Converts an array of characters representing hexadecimal values into an array of bytes of * those same values. E.g. {@code unhex("12".getBytes())} returns {@code new byte[]{0x12}}. diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/EncodingUtilsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/EncodingUtilsTest.java index 8d7b0b797a6f3..4c5d11dc4d6e6 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/EncodingUtilsTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/EncodingUtilsTest.java @@ -21,11 +21,9 @@ import org.junit.jupiter.api.Test; import java.io.Serializable; -import java.nio.charset.StandardCharsets; import java.util.Objects; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link org.apache.flink.table.utils.EncodingUtils}. */ class EncodingUtilsTest { @@ -63,80 +61,6 @@ void testRepetition() { assertThat(EncodingUtils.repeat("we", 3)).isEqualTo("wewewe"); } - @Test - void testIsValidUtf8() { - // ASCII fast-path - assertThat(EncodingUtils.isValidUtf8(new byte[0])).isTrue(); - assertThat(EncodingUtils.isValidUtf8(new byte[] {0x00})).isTrue(); - assertThat(EncodingUtils.isValidUtf8("Hello, world!".getBytes(StandardCharsets.UTF_8))) - .isTrue(); - - // Multi-byte sequences: 2-byte (é), 3-byte (€), 4-byte (😀) - assertThat(EncodingUtils.isValidUtf8("é€😀".getBytes(StandardCharsets.UTF_8))).isTrue(); - - // Code-point boundaries at each width (smallest + largest) - assertThat(EncodingUtils.isValidUtf8(new byte[] {(byte) 0xC2, (byte) 0x80})) - .isTrue(); // U+0080 - assertThat(EncodingUtils.isValidUtf8(new byte[] {(byte) 0xDF, (byte) 0xBF})) - .isTrue(); // U+07FF - assertThat(EncodingUtils.isValidUtf8(new byte[] {(byte) 0xE0, (byte) 0xA0, (byte) 0x80})) - .isTrue(); // U+0800 - assertThat(EncodingUtils.isValidUtf8(new byte[] {(byte) 0xEF, (byte) 0xBF, (byte) 0xBF})) - .isTrue(); // U+FFFF - assertThat( - EncodingUtils.isValidUtf8( - new byte[] {(byte) 0xF0, (byte) 0x90, (byte) 0x80, (byte) 0x80})) - .isTrue(); // U+10000 - assertThat( - EncodingUtils.isValidUtf8( - new byte[] {(byte) 0xF4, (byte) 0x8F, (byte) 0xBF, (byte) 0xBF})) - .isTrue(); // U+10FFFF (largest valid) - - // Above U+10FFFF and forbidden lead bytes (RFC 3629) - assertThat( - EncodingUtils.isValidUtf8( - new byte[] {(byte) 0xF4, (byte) 0x90, (byte) 0x80, (byte) 0x80})) - .isFalse(); // U+110000 - - // Stray continuation byte without a lead - assertThat(EncodingUtils.isValidUtf8(new byte[] {(byte) 0x80})).isFalse(); - - // Truncated multi-byte sequences - assertThat(EncodingUtils.isValidUtf8(new byte[] {(byte) 0xE2, (byte) 0x82})).isFalse(); - assertThat(EncodingUtils.isValidUtf8(new byte[] {(byte) 0xF0})).isFalse(); - - // Overlong forms of '/' at every width - assertThat(EncodingUtils.isValidUtf8(new byte[] {(byte) 0xC0, (byte) 0xAF})).isFalse(); - assertThat(EncodingUtils.isValidUtf8(new byte[] {(byte) 0xE0, (byte) 0x80, (byte) 0xAF})) - .isFalse(); - assertThat( - EncodingUtils.isValidUtf8( - new byte[] {(byte) 0xF0, (byte) 0x80, (byte) 0x80, (byte) 0xAF})) - .isFalse(); - - // UTF-16 surrogates (high + low, smallest + largest) - assertThat(EncodingUtils.isValidUtf8(new byte[] {(byte) 0xED, (byte) 0xA0, (byte) 0x80})) - .isFalse(); // U+D800 - assertThat(EncodingUtils.isValidUtf8(new byte[] {(byte) 0xED, (byte) 0xBF, (byte) 0xBF})) - .isFalse(); // U+DFFF - - // Offset/length variant: only the inner range is validated - final byte[] padded = {(byte) 0x80, 'O', 'K', (byte) 0x80}; - assertThat(EncodingUtils.isValidUtf8(padded, 1, 2)).isTrue(); - assertThat(EncodingUtils.isValidUtf8(padded, 0, 4)).isFalse(); - assertThat(EncodingUtils.isValidUtf8(padded, padded.length, 0)).isTrue(); - - // null input -> false (Guava Strings.isNullOrEmpty convention) - assertThat(EncodingUtils.isValidUtf8((byte[]) null)).isFalse(); - assertThat(EncodingUtils.isValidUtf8(null, 0, 0)).isFalse(); - - // Bad bounds -> IllegalArgumentException - assertThatThrownBy(() -> EncodingUtils.isValidUtf8(new byte[3], 0, -1)) - .isInstanceOf(IllegalArgumentException.class); - assertThatThrownBy(() -> EncodingUtils.isValidUtf8(new byte[3], 1, 3)) - .isInstanceOf(IllegalArgumentException.class); - } - @Test void testUnhex() { assertThat(EncodingUtils.unhex("".getBytes())).isEqualTo(new byte[0]); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/IsValidUtf8Function.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/IsValidUtf8Function.java index 36b4af998aff2..5d6a4c8476b4e 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/IsValidUtf8Function.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/IsValidUtf8Function.java @@ -19,9 +19,9 @@ package org.apache.flink.table.runtime.functions.scalar; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.binary.StringUtf8Utils; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; -import org.apache.flink.table.utils.EncodingUtils; import javax.annotation.Nullable; @@ -37,6 +37,6 @@ public IsValidUtf8Function(SpecializedContext context) { if (bytes == null) { return null; } - return EncodingUtils.isValidUtf8(bytes); + return StringUtf8Utils.firstInvalidUtf8ByteIndex(bytes, 0, bytes.length) < 0; } } From 57dddc2e1fff77268d90a0753c38d0c16e86cab9 Mon Sep 17 00:00:00 2001 From: Gustavo de Morais Date: Wed, 6 May 2026 11:31:08 +0200 Subject: [PATCH 4/4] [FLINK-39602][python] Add Python bindings for IS_VALID_UTF8 and MAKE_VALID_UTF8 --- flink-python/pyflink/table/expression.py | 22 +++++++++++++++++++ .../pyflink/table/tests/test_expression.py | 4 ++++ 2 files changed, 26 insertions(+) diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py index 9e1711dc4558d..82c49f2edc63d 100644 --- a/flink-python/pyflink/table/expression.py +++ b/flink-python/pyflink/table/expression.py @@ -1489,6 +1489,28 @@ def inet_ntoa(self) -> 'Expression[str]': """ return _unary_op("inetNtoa")(self) + @property + def is_valid_utf8(self) -> 'Expression[bool]': + """ + Returns true if the input bytes are a well-formed UTF-8 sequence, false otherwise. + Returns null if the input is null. + + Specifically rejects: truncated multi-byte sequences (missing continuation bytes), + "overlong" encodings (using more bytes than necessary for the code point), code points + above the Unicode maximum U+10FFFF, and UTF-16 surrogate values U+D800-U+DFFF (which + have no UTF-8 representation). + """ + return _unary_op("isValidUtf8")(self) + + @property + def make_valid_utf8(self) -> 'Expression[str]': + """ + Decodes the input bytes as UTF-8, replacing each invalid sequence with the Unicode + replacement character U+FFFD. The substitution is lossy and irreversible. Returns null + if the input is null. + """ + return _unary_op("makeValidUtf8")(self) + def parse_url(self, part_to_extract: Union[str, 'Expression[str]'], key: Union[str, 'Expression[str]'] = None) -> 'Expression[str]': """ diff --git a/flink-python/pyflink/table/tests/test_expression.py b/flink-python/pyflink/table/tests/test_expression.py index e7a2078d1dbe4..543566c1e8cb8 100644 --- a/flink-python/pyflink/table/tests/test_expression.py +++ b/flink-python/pyflink/table/tests/test_expression.py @@ -188,6 +188,10 @@ def test_expression(self): self.assertEqual("INET_ATON(a)", str(expr1.inet_aton())) self.assertEqual("INET_NTOA(a)", str(expr1.inet_ntoa())) + # utf-8 validation functions + self.assertEqual("IS_VALID_UTF8(a)", str(expr1.is_valid_utf8)) + self.assertEqual("MAKE_VALID_UTF8(a)", str(expr1.make_valid_utf8)) + # regexp functions self.assertEqual("regexp(a, b)", str(expr1.regexp(expr2))) self.assertEqual("REGEXP_COUNT(a, b)", str(expr1.regexp_count(expr2)))