From 3c8d79f8d5a0075ecad862208428306fc2a1543c Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Mon, 22 Jun 2026 12:34:28 +0200 Subject: [PATCH] [FLINK-39866][table] Remove dead KEYVALUE code from SqlFunctionUtils and StringCallGen --- .../planner/codegen/calls/StringCallGen.scala | 10 -- .../data/binary/BinaryStringDataUtil.java | 115 --------------- .../runtime/functions/SqlFunctionUtils.java | 56 -------- .../table/data/BinaryStringDataTest.java | 132 ------------------ 4 files changed, 313 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala index 33e335f0549b6..f9bc149e83186 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala @@ -560,16 +560,6 @@ object StringCallGen { } } - def generateKeyValue( - ctx: CodeGeneratorContext, - operands: Seq[GeneratedExpression], - returnType: LogicalType): GeneratedExpression = { - val className = classOf[SqlFunctionUtils].getCanonicalName - generateCallIfArgsNullable(ctx, returnType, operands) { - terms => s"$className.keyValue(${terms.mkString(",")})" - } - } - def generateHashCode( ctx: CodeGeneratorContext, operands: Seq[GeneratedExpression], diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java index 496d0dad50885..53c25f9a46a28 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java @@ -618,121 +618,6 @@ public static TimestampData toTimestamp( return DateTimeUtils.parseTimestampData(input.toString(), precision, timeZone); } - /** - * Parse target string as key-value string and return the value matches key name. If accept any - * null arguments, return null. example: keyvalue('k1=v1;k2=v2', ';', '=', 'k2') = 'v2' - * keyvalue('k1:v1,k2:v2', ',', ':', 'k3') = NULL - * - * @param split1 separator between key-value tuple. - * @param split2 separator between key and value. - * @param keyName name of the key whose value you want return. - * @return target value. - */ - public static BinaryStringData keyValue( - BinaryStringData str, byte split1, byte split2, BinaryStringData keyName) { - str.ensureMaterialized(); - if (keyName == null || keyName.getSizeInBytes() == 0) { - return null; - } - if (str.inFirstSegment() && keyName.inFirstSegment()) { - // position in byte - int byteIdx = 0; - // position of last split1 - int lastSplit1Idx = -1; - while (byteIdx < str.getSizeInBytes()) { - // If find next split1 in str, process current kv - if (str.getSegments()[0].get(str.getOffset() + byteIdx) == split1) { - int currentKeyIdx = lastSplit1Idx + 1; - // If key of current kv is keyName, return the value directly - BinaryStringData value = - findValueOfKey(str, split2, keyName, currentKeyIdx, byteIdx); - if (value != null) { - return value; - } - lastSplit1Idx = byteIdx; - } - byteIdx++; - } - // process the string which is not ends with split1 - int currentKeyIdx = lastSplit1Idx + 1; - return findValueOfKey(str, split2, keyName, currentKeyIdx, str.getSizeInBytes()); - } else { - return keyValueSlow(str, split1, split2, keyName); - } - } - - private static BinaryStringData findValueOfKey( - BinaryStringData str, byte split, BinaryStringData keyName, int start, int end) { - int keyNameLen = keyName.getSizeInBytes(); - for (int idx = start; idx < end; idx++) { - if (str.getSegments()[0].get(str.getOffset() + idx) == split) { - if (idx == start + keyNameLen - && str.getSegments()[0].equalTo( - keyName.getSegments()[0], - str.getOffset() + start, - keyName.getOffset(), - keyNameLen)) { - int valueIdx = idx + 1; - int valueLen = end - valueIdx; - byte[] bytes = new byte[valueLen]; - str.getSegments()[0].get(str.getOffset() + valueIdx, bytes, 0, valueLen); - return fromBytes(bytes, 0, valueLen); - } else { - return null; - } - } - } - return null; - } - - private static BinaryStringData keyValueSlow( - BinaryStringData str, byte split1, byte split2, BinaryStringData keyName) { - // position in byte - int byteIdx = 0; - // position of last split1 - int lastSplit1Idx = -1; - while (byteIdx < str.getSizeInBytes()) { - // If find next split1 in str, process current kv - if (str.byteAt(byteIdx) == split1) { - int currentKeyIdx = lastSplit1Idx + 1; - BinaryStringData value = - findValueOfKeySlow(str, split2, keyName, currentKeyIdx, byteIdx); - if (value != null) { - return value; - } - lastSplit1Idx = byteIdx; - } - byteIdx++; - } - int currentKeyIdx = lastSplit1Idx + 1; - return findValueOfKeySlow(str, split2, keyName, currentKeyIdx, str.getSizeInBytes()); - } - - private static BinaryStringData findValueOfKeySlow( - BinaryStringData str, byte split, BinaryStringData keyName, int start, int end) { - int keyNameLen = keyName.getSizeInBytes(); - for (int idx = start; idx < end; idx++) { - if (str.byteAt(idx) == split) { - if (idx == start + keyNameLen - && SegmentsUtil.equals( - str.getSegments(), - str.getOffset() + start, - keyName.getSegments(), - keyName.getOffset(), - keyNameLen)) { - int valueIdx = idx + 1; - byte[] bytes = - SegmentsUtil.copyToBytes( - str.getSegments(), str.getOffset() + valueIdx, end - valueIdx); - return fromBytes(bytes); - } else { - return null; - } - } - } - return null; - } - public static BinaryStringData substringSQL(BinaryStringData str, int pos) { return substringSQL(str, pos, Integer.MAX_VALUE); } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java index 58dfbaf10b05f..7dd55b73a477b 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java @@ -486,62 +486,6 @@ public static Matcher getRegexpMatcher(@Nullable StringData str, @Nullable Strin } } - /** - * Parse string as key-value string and return the value matches key name. example: - * keyvalue('k1=v1;k2=v2', ';', '=', 'k2') = 'v2' keyvalue('k1:v1,k2:v2', ',', ':', 'k3') = NULL - * - * @param str target string. - * @param pairSeparator separator between key-value tuple. - * @param kvSeparator separator between key and value. - * @param keyName name of the key whose value you want return. - * @return target value. - */ - public static BinaryStringData keyValue( - BinaryStringData str, - BinaryStringData pairSeparator, - BinaryStringData kvSeparator, - BinaryStringData keyName) { - if (str == null || str.getSizeInBytes() == 0) { - return null; - } - if (pairSeparator != null - && pairSeparator.getSizeInBytes() == 1 - && kvSeparator != null - && kvSeparator.getSizeInBytes() == 1) { - return BinaryStringDataUtil.keyValue( - str, pairSeparator.byteAt(0), kvSeparator.byteAt(0), keyName); - } else { - return BinaryStringData.fromString( - keyValue( - BinaryStringDataUtil.safeToString(str), - BinaryStringDataUtil.safeToString(pairSeparator), - BinaryStringDataUtil.safeToString(kvSeparator), - BinaryStringDataUtil.safeToString(keyName))); - } - } - - private static String keyValue( - String str, String pairSeparator, String kvSeparator, String keyName) { - try { - if (StringUtils.isEmpty(str)) { - return null; - } - String[] values = StringUtils.split(str, pairSeparator); - for (String value : values) { - if (!StringUtils.isEmpty(value)) { - String[] kv = StringUtils.split(kvSeparator); - if (kv != null && kv.length == 2 && kv[0].equals(keyName)) { - return kv[1]; - } - } - } - return null; - } catch (Exception e) { - LOG.error("Exception when parse key-value", e); - return null; - } - } - /** * Calculate the hash value of a given string. * diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java index 160d5f7f4caeb..d6dcf64058fb2 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java @@ -44,7 +44,6 @@ import static org.apache.flink.table.data.binary.BinaryStringDataUtil.concat; import static org.apache.flink.table.data.binary.BinaryStringDataUtil.concatWs; import static org.apache.flink.table.data.binary.BinaryStringDataUtil.isEmpty; -import static org.apache.flink.table.data.binary.BinaryStringDataUtil.keyValue; import static org.apache.flink.table.data.binary.BinaryStringDataUtil.reverse; import static org.apache.flink.table.data.binary.BinaryStringDataUtil.splitByWholeSeparatorPreserveAllTokens; import static org.apache.flink.table.data.binary.BinaryStringDataUtil.substringSQL; @@ -627,137 +626,6 @@ void testEncodeWithIllegalCharacter() throws UnsupportedEncodingException { assertThat(StringUtf8Utils.encodeUTF8(str)).isEqualTo(str.getBytes("UTF-8")); } - @TestTemplate - void testKeyValue() { - assertThat( - keyValue( - fromString("k1:v1|k2:v2"), - fromString("|").byteAt(0), - fromString(":").byteAt(0), - fromString("k3"))) - .isNull(); - assertThat( - keyValue( - fromString("k1:v1|k2:v2|"), - fromString("|").byteAt(0), - fromString(":").byteAt(0), - fromString("k3"))) - .isNull(); - assertThat( - keyValue( - fromString("|k1:v1|k2:v2|"), - fromString("|").byteAt(0), - fromString(":").byteAt(0), - fromString("k3"))) - .isNull(); - String tab = org.apache.commons.lang3.StringEscapeUtils.unescapeJava("\t"); - assertThat( - keyValue( - fromString("k1:v1" + tab + "k2:v2"), - fromString("\t").byteAt(0), - fromString(":").byteAt(0), - fromString("k2"))) - .isEqualTo(fromString("v2")); - assertThat( - keyValue( - fromString("k1:v1|k2:v2"), - fromString("|").byteAt(0), - fromString(":").byteAt(0), - null)) - .isNull(); - assertThat( - keyValue( - fromString("k1=v1;k2=v2"), - fromString(";").byteAt(0), - fromString("=").byteAt(0), - fromString("k2"))) - .isEqualTo(fromString("v2")); - assertThat( - keyValue( - fromString("|k1=v1|k2=v2|"), - fromString("|").byteAt(0), - fromString("=").byteAt(0), - fromString("k2"))) - .isEqualTo(fromString("v2")); - assertThat( - keyValue( - fromString("k1=v1||k2=v2"), - fromString("|").byteAt(0), - fromString("=").byteAt(0), - fromString("k2"))) - .isEqualTo(fromString("v2")); - assertThat( - keyValue( - fromString("k1=v1;k2"), - fromString(";").byteAt(0), - fromString("=").byteAt(0), - fromString("k2"))) - .isNull(); - assertThat( - keyValue( - fromString("k1;k2=v2"), - fromString(";").byteAt(0), - fromString("=").byteAt(0), - fromString("k1"))) - .isNull(); - assertThat( - keyValue( - fromString("k=1=v1;k2=v2"), - fromString(";").byteAt(0), - fromString("=").byteAt(0), - fromString("k="))) - .isNull(); - assertThat( - keyValue( - fromString("k1==v1;k2=v2"), - fromString(";").byteAt(0), - fromString("=").byteAt(0), - fromString("k1"))) - .isEqualTo(fromString("=v1")); - assertThat( - keyValue( - fromString("k1==v1;k2=v2"), - fromString(";").byteAt(0), - fromString("=").byteAt(0), - fromString("k1="))) - .isNull(); - assertThat( - keyValue( - fromString("k1=v1;k2=v2"), - fromString(";").byteAt(0), - fromString("=").byteAt(0), - fromString("k1="))) - .isNull(); - assertThat( - keyValue( - fromString("k1k1=v1;k2=v2"), - fromString(";").byteAt(0), - fromString("=").byteAt(0), - fromString("k1"))) - .isNull(); - assertThat( - keyValue( - fromString("k1=v1;k2=v2"), - fromString(";").byteAt(0), - fromString("=").byteAt(0), - fromString("k1k1k1k1k1k1k1k1k1k1"))) - .isNull(); - assertThat( - keyValue( - fromString("k1:v||k2:v2"), - fromString("|").byteAt(0), - fromString(":").byteAt(0), - fromString("k2"))) - .isEqualTo(fromString("v2")); - assertThat( - keyValue( - fromString("k1:v||k2:v2"), - fromString("|").byteAt(0), - fromString(":").byteAt(0), - fromString("k2"))) - .isEqualTo(fromString("v2")); - } - @TestTemplate void testDecodeWithIllegalUtf8Bytes() throws UnsupportedEncodingException {