Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -560,16 +560,6 @@ object StringCallGen {
}
}

def generateKeyValue(
ctx: CodeGeneratorContext,
operands: Seq[GeneratedExpression],
returnType: LogicalType): GeneratedExpression = {
val className = classOf[SqlFunctionUtils].getCanonicalName
generateCallIfArgsNullable(ctx, returnType, operands) {
terms => s"$className.keyValue(${terms.mkString(",")})"
}
}

def generateHashCode(
ctx: CodeGeneratorContext,
operands: Seq[GeneratedExpression],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,121 +618,6 @@ public static TimestampData toTimestamp(
return DateTimeUtils.parseTimestampData(input.toString(), precision, timeZone);
}

/**
* Parse target string as key-value string and return the value matches key name. If accept any
* null arguments, return null. example: keyvalue('k1=v1;k2=v2', ';', '=', 'k2') = 'v2'
* keyvalue('k1:v1,k2:v2', ',', ':', 'k3') = NULL
*
* @param split1 separator between key-value tuple.
* @param split2 separator between key and value.
* @param keyName name of the key whose value you want return.
* @return target value.
*/
public static BinaryStringData keyValue(
BinaryStringData str, byte split1, byte split2, BinaryStringData keyName) {
str.ensureMaterialized();
if (keyName == null || keyName.getSizeInBytes() == 0) {
return null;
}
if (str.inFirstSegment() && keyName.inFirstSegment()) {
// position in byte
int byteIdx = 0;
// position of last split1
int lastSplit1Idx = -1;
while (byteIdx < str.getSizeInBytes()) {
// If find next split1 in str, process current kv
if (str.getSegments()[0].get(str.getOffset() + byteIdx) == split1) {
int currentKeyIdx = lastSplit1Idx + 1;
// If key of current kv is keyName, return the value directly
BinaryStringData value =
findValueOfKey(str, split2, keyName, currentKeyIdx, byteIdx);
if (value != null) {
return value;
}
lastSplit1Idx = byteIdx;
}
byteIdx++;
}
// process the string which is not ends with split1
int currentKeyIdx = lastSplit1Idx + 1;
return findValueOfKey(str, split2, keyName, currentKeyIdx, str.getSizeInBytes());
} else {
return keyValueSlow(str, split1, split2, keyName);
}
}

private static BinaryStringData findValueOfKey(
BinaryStringData str, byte split, BinaryStringData keyName, int start, int end) {
int keyNameLen = keyName.getSizeInBytes();
for (int idx = start; idx < end; idx++) {
if (str.getSegments()[0].get(str.getOffset() + idx) == split) {
if (idx == start + keyNameLen
&& str.getSegments()[0].equalTo(
keyName.getSegments()[0],
str.getOffset() + start,
keyName.getOffset(),
keyNameLen)) {
int valueIdx = idx + 1;
int valueLen = end - valueIdx;
byte[] bytes = new byte[valueLen];
str.getSegments()[0].get(str.getOffset() + valueIdx, bytes, 0, valueLen);
return fromBytes(bytes, 0, valueLen);
} else {
return null;
}
}
}
return null;
}

private static BinaryStringData keyValueSlow(
BinaryStringData str, byte split1, byte split2, BinaryStringData keyName) {
// position in byte
int byteIdx = 0;
// position of last split1
int lastSplit1Idx = -1;
while (byteIdx < str.getSizeInBytes()) {
// If find next split1 in str, process current kv
if (str.byteAt(byteIdx) == split1) {
int currentKeyIdx = lastSplit1Idx + 1;
BinaryStringData value =
findValueOfKeySlow(str, split2, keyName, currentKeyIdx, byteIdx);
if (value != null) {
return value;
}
lastSplit1Idx = byteIdx;
}
byteIdx++;
}
int currentKeyIdx = lastSplit1Idx + 1;
return findValueOfKeySlow(str, split2, keyName, currentKeyIdx, str.getSizeInBytes());
}

private static BinaryStringData findValueOfKeySlow(
BinaryStringData str, byte split, BinaryStringData keyName, int start, int end) {
int keyNameLen = keyName.getSizeInBytes();
for (int idx = start; idx < end; idx++) {
if (str.byteAt(idx) == split) {
if (idx == start + keyNameLen
&& SegmentsUtil.equals(
str.getSegments(),
str.getOffset() + start,
keyName.getSegments(),
keyName.getOffset(),
keyNameLen)) {
int valueIdx = idx + 1;
byte[] bytes =
SegmentsUtil.copyToBytes(
str.getSegments(), str.getOffset() + valueIdx, end - valueIdx);
return fromBytes(bytes);
} else {
return null;
}
}
}
return null;
}

public static BinaryStringData substringSQL(BinaryStringData str, int pos) {
return substringSQL(str, pos, Integer.MAX_VALUE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,62 +486,6 @@ public static Matcher getRegexpMatcher(@Nullable StringData str, @Nullable Strin
}
}

/**
* Parse string as key-value string and return the value matches key name. example:
* keyvalue('k1=v1;k2=v2', ';', '=', 'k2') = 'v2' keyvalue('k1:v1,k2:v2', ',', ':', 'k3') = NULL
*
* @param str target string.
* @param pairSeparator separator between key-value tuple.
* @param kvSeparator separator between key and value.
* @param keyName name of the key whose value you want return.
* @return target value.
*/
public static BinaryStringData keyValue(
BinaryStringData str,
BinaryStringData pairSeparator,
BinaryStringData kvSeparator,
BinaryStringData keyName) {
if (str == null || str.getSizeInBytes() == 0) {
return null;
}
if (pairSeparator != null
&& pairSeparator.getSizeInBytes() == 1
&& kvSeparator != null
&& kvSeparator.getSizeInBytes() == 1) {
return BinaryStringDataUtil.keyValue(
str, pairSeparator.byteAt(0), kvSeparator.byteAt(0), keyName);
} else {
return BinaryStringData.fromString(
keyValue(
BinaryStringDataUtil.safeToString(str),
BinaryStringDataUtil.safeToString(pairSeparator),
BinaryStringDataUtil.safeToString(kvSeparator),
BinaryStringDataUtil.safeToString(keyName)));
}
}

private static String keyValue(
String str, String pairSeparator, String kvSeparator, String keyName) {
try {
if (StringUtils.isEmpty(str)) {
return null;
}
String[] values = StringUtils.split(str, pairSeparator);
for (String value : values) {
if (!StringUtils.isEmpty(value)) {
String[] kv = StringUtils.split(kvSeparator);
if (kv != null && kv.length == 2 && kv[0].equals(keyName)) {
return kv[1];
}
}
}
return null;
} catch (Exception e) {
LOG.error("Exception when parse key-value", e);
return null;
}
}

/**
* Calculate the hash value of a given string.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import static org.apache.flink.table.data.binary.BinaryStringDataUtil.concat;
import static org.apache.flink.table.data.binary.BinaryStringDataUtil.concatWs;
import static org.apache.flink.table.data.binary.BinaryStringDataUtil.isEmpty;
import static org.apache.flink.table.data.binary.BinaryStringDataUtil.keyValue;
import static org.apache.flink.table.data.binary.BinaryStringDataUtil.reverse;
import static org.apache.flink.table.data.binary.BinaryStringDataUtil.splitByWholeSeparatorPreserveAllTokens;
import static org.apache.flink.table.data.binary.BinaryStringDataUtil.substringSQL;
Expand Down Expand Up @@ -627,137 +626,6 @@ void testEncodeWithIllegalCharacter() throws UnsupportedEncodingException {
assertThat(StringUtf8Utils.encodeUTF8(str)).isEqualTo(str.getBytes("UTF-8"));
}

@TestTemplate
void testKeyValue() {
assertThat(
keyValue(
fromString("k1:v1|k2:v2"),
fromString("|").byteAt(0),
fromString(":").byteAt(0),
fromString("k3")))
.isNull();
assertThat(
keyValue(
fromString("k1:v1|k2:v2|"),
fromString("|").byteAt(0),
fromString(":").byteAt(0),
fromString("k3")))
.isNull();
assertThat(
keyValue(
fromString("|k1:v1|k2:v2|"),
fromString("|").byteAt(0),
fromString(":").byteAt(0),
fromString("k3")))
.isNull();
String tab = org.apache.commons.lang3.StringEscapeUtils.unescapeJava("\t");
assertThat(
keyValue(
fromString("k1:v1" + tab + "k2:v2"),
fromString("\t").byteAt(0),
fromString(":").byteAt(0),
fromString("k2")))
.isEqualTo(fromString("v2"));
assertThat(
keyValue(
fromString("k1:v1|k2:v2"),
fromString("|").byteAt(0),
fromString(":").byteAt(0),
null))
.isNull();
assertThat(
keyValue(
fromString("k1=v1;k2=v2"),
fromString(";").byteAt(0),
fromString("=").byteAt(0),
fromString("k2")))
.isEqualTo(fromString("v2"));
assertThat(
keyValue(
fromString("|k1=v1|k2=v2|"),
fromString("|").byteAt(0),
fromString("=").byteAt(0),
fromString("k2")))
.isEqualTo(fromString("v2"));
assertThat(
keyValue(
fromString("k1=v1||k2=v2"),
fromString("|").byteAt(0),
fromString("=").byteAt(0),
fromString("k2")))
.isEqualTo(fromString("v2"));
assertThat(
keyValue(
fromString("k1=v1;k2"),
fromString(";").byteAt(0),
fromString("=").byteAt(0),
fromString("k2")))
.isNull();
assertThat(
keyValue(
fromString("k1;k2=v2"),
fromString(";").byteAt(0),
fromString("=").byteAt(0),
fromString("k1")))
.isNull();
assertThat(
keyValue(
fromString("k=1=v1;k2=v2"),
fromString(";").byteAt(0),
fromString("=").byteAt(0),
fromString("k=")))
.isNull();
assertThat(
keyValue(
fromString("k1==v1;k2=v2"),
fromString(";").byteAt(0),
fromString("=").byteAt(0),
fromString("k1")))
.isEqualTo(fromString("=v1"));
assertThat(
keyValue(
fromString("k1==v1;k2=v2"),
fromString(";").byteAt(0),
fromString("=").byteAt(0),
fromString("k1=")))
.isNull();
assertThat(
keyValue(
fromString("k1=v1;k2=v2"),
fromString(";").byteAt(0),
fromString("=").byteAt(0),
fromString("k1=")))
.isNull();
assertThat(
keyValue(
fromString("k1k1=v1;k2=v2"),
fromString(";").byteAt(0),
fromString("=").byteAt(0),
fromString("k1")))
.isNull();
assertThat(
keyValue(
fromString("k1=v1;k2=v2"),
fromString(";").byteAt(0),
fromString("=").byteAt(0),
fromString("k1k1k1k1k1k1k1k1k1k1")))
.isNull();
assertThat(
keyValue(
fromString("k1:v||k2:v2"),
fromString("|").byteAt(0),
fromString(":").byteAt(0),
fromString("k2")))
.isEqualTo(fromString("v2"));
assertThat(
keyValue(
fromString("k1:v||k2:v2"),
fromString("|").byteAt(0),
fromString(":").byteAt(0),
fromString("k2")))
.isEqualTo(fromString("v2"));
}

@TestTemplate
void testDecodeWithIllegalUtf8Bytes() throws UnsupportedEncodingException {

Expand Down