diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/ArrayDataSerializer.java b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/ArrayDataSerializer.java index 0c5eea84aea5e..1f5aed4242958 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/ArrayDataSerializer.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/ArrayDataSerializer.java @@ -35,6 +35,9 @@ import java.io.IOException; +import static org.apache.flink.table.runtime.typeutils.SerializerCheckUtils.isSerializerCompatibleAfterNullabilityWidening; +import static org.apache.flink.table.types.logical.utils.LogicalTypeUtils.isTypeCompatibleAfterNullabilityWidening; + /** * A {@link TypeSerializer} for {@link ArrayData}. It should be noted that the header will not be * encoded. Currently Python doesn't support BinaryArrayData natively, so we can't use @@ -182,13 +185,15 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( ArrayDataSerializerSnapshot oldArrayDataSerializerSnapshot = (ArrayDataSerializerSnapshot) oldSerializerSnapshot; - if (!elementType.equals(oldArrayDataSerializerSnapshot.elementType) - || !elementTypeSerializer.equals( + + if (!isTypeCompatibleAfterNullabilityWidening( + elementType, oldArrayDataSerializerSnapshot.elementType) + || !isSerializerCompatibleAfterNullabilityWidening( + elementTypeSerializer, oldArrayDataSerializerSnapshot.elementTypeSerializer)) { return TypeSerializerSchemaCompatibility.incompatible(); - } else { - return TypeSerializerSchemaCompatibility.compatibleAsIs(); } + return TypeSerializerSchemaCompatibility.compatibleAsIs(); } } } diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/MapDataSerializer.java b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/MapDataSerializer.java index 4e42e737c1f53..b76bd10d1ea52 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/MapDataSerializer.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/MapDataSerializer.java @@ -38,6 +38,9 @@ import java.io.IOException; +import static org.apache.flink.table.runtime.typeutils.SerializerCheckUtils.isSerializerCompatibleAfterNullabilityWidening; +import static org.apache.flink.table.types.logical.utils.LogicalTypeUtils.isTypeCompatibleAfterNullabilityWidening; + /** * A {@link TypeSerializer} for {@link MapData}. It should be noted that the header will not be * encoded. Currently Python doesn't support BinaryMapData natively, so we can't use @@ -237,14 +240,18 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( BaseMapSerializerSnapshot oldBaseMapDataSerializerSnapshot = (BaseMapSerializerSnapshot) oldSerializerSnapshot; - if (!keyType.equals(oldBaseMapDataSerializerSnapshot.keyType) - || !valueType.equals(oldBaseMapDataSerializerSnapshot.valueType) - || !keySerializer.equals(oldBaseMapDataSerializerSnapshot.keySerializer) - || !valueSerializer.equals(oldBaseMapDataSerializerSnapshot.valueSerializer)) { + + if (!isTypeCompatibleAfterNullabilityWidening( + keyType, oldBaseMapDataSerializerSnapshot.keyType) + || !isTypeCompatibleAfterNullabilityWidening( + valueType, oldBaseMapDataSerializerSnapshot.valueType) + || !isSerializerCompatibleAfterNullabilityWidening( + keySerializer, oldBaseMapDataSerializerSnapshot.keySerializer) + || !isSerializerCompatibleAfterNullabilityWidening( + valueSerializer, oldBaseMapDataSerializerSnapshot.valueSerializer)) { return TypeSerializerSchemaCompatibility.incompatible(); - } else { - return TypeSerializerSchemaCompatibility.compatibleAsIs(); } + return TypeSerializerSchemaCompatibility.compatibleAsIs(); } } } diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java index 5b0d41141e01d..d627f44f1269a 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java @@ -35,11 +35,11 @@ import org.apache.flink.util.InstantiationUtil; import java.io.IOException; -import java.util.Arrays; import java.util.stream.IntStream; import static org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask; import static org.apache.flink.api.java.typeutils.runtime.MaskUtils.writeMask; +import static org.apache.flink.table.types.logical.utils.LogicalTypeUtils.areTypesCompatibleAfterNullabilityWidening; /** * A {@link TypeSerializer} for {@link RowData}. It should be noted that the row kind will be @@ -199,13 +199,14 @@ public RowDataSerializer restoreSerializer() { @Override public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( TypeSerializerSnapshot oldSerializerSnapshot) { - if (!(oldSerializerSnapshot instanceof RowDataSerializerSnapshot)) { + if (!(oldSerializerSnapshot + instanceof RowDataSerializerSnapshot oldRowDataSerializerSnapshot)) { return TypeSerializerSchemaCompatibility.incompatible(); } - RowDataSerializerSnapshot oldRowDataSerializerSnapshot = - (RowDataSerializerSnapshot) oldSerializerSnapshot; - if (!Arrays.equals(types, oldRowDataSerializerSnapshot.types)) { + // Allow NOT NULL -> NULL widening; reject NULL -> NOT NULL narrowing. + if (!areTypesCompatibleAfterNullabilityWidening( + types, oldRowDataSerializerSnapshot.types)) { return TypeSerializerSchemaCompatibility.incompatible(); } diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/serializers/python/SerializerSnapshotTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/serializers/python/SerializerSnapshotTest.java new file mode 100644 index 0000000000000..a040aa6bae162 --- /dev/null +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/serializers/python/SerializerSnapshotTest.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.typeutils.serializers.python; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.runtime.typeutils.InternalSerializers; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.RowType.RowField; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute; +import org.apache.flink.table.types.logical.VarCharType; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Arrays; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import static org.apache.flink.api.common.typeutils.TypeSerializerConditions.isCompatibleAsIs; +import static org.apache.flink.api.common.typeutils.TypeSerializerConditions.isIncompatible; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for serializer snapshots. */ +class SerializerSnapshotTest { + + @ParameterizedTest(name = "{0}") + @MethodSource("serializerSpecs") + void sameTypesIsCompatibleAsIs(SerializerSpec serializerSpec) throws Exception { + assertThat(resolveCompatibility(serializerSpec, new IntType(false), new IntType(false))) + .is(isCompatibleAsIs()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("serializerSpecs") + void differentLogicalTypeIsIncompatible(SerializerSpec serializerSpec) throws Exception { + assertThat(resolveCompatibility(serializerSpec, new IntType(false), new BigIntType(false))) + .is(isIncompatible()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("serializerSpecs") + void wideningNullabilityInNestedRowIsCompatible(SerializerSpec serializerSpec) + throws Exception { + assertThat(resolveCompatibility(serializerSpec, nestedRowType(false), nestedRowType(true))) + .is(isCompatibleAsIs()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("serializerSpecs") + void narrowingNullabilityInNestedRowIsIncompatible(SerializerSpec serializerSpec) + throws Exception { + assertThat(resolveCompatibility(serializerSpec, nestedRowType(true), nestedRowType(false))) + .is(isIncompatible()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("serializerCascadingTypes") + void wideningNullabilityInCascadingTypeIsCompatible( + SerializerSpec serializerSpec, LogicalType previousType, LogicalType currentType) + throws Exception { + assertThat(resolveCompatibility(serializerSpec, previousType, currentType)) + .is(isCompatibleAsIs()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("serializerCascadingTypes") + void narrowingNullabilityInCascadingTypeIsIncompatible( + SerializerSpec serializerSpec, LogicalType previousType, LogicalType currentType) + throws Exception { + assertThat(resolveCompatibility(serializerSpec, currentType, previousType)) + .is(isIncompatible()); + } + + // ------------------------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------------------------- + + /** + * Round-trips the previous serializer's snapshot through serialization and resolves + * compatibility against the current serializer's snapshot. + */ + private static TypeSerializerSchemaCompatibility resolveCompatibility( + SerializerSpec serializerSpec, LogicalType previousType, LogicalType currentType) + throws Exception { + TypeSerializer previous = serializerSpec.create(previousType); + TypeSerializer current = serializerSpec.create(currentType); + TypeSerializerSnapshot previousSnapshot = previous.snapshotConfiguration(); + + DataOutputSerializer out = new DataOutputSerializer(64); + previousSnapshot.writeSnapshot(out); + + TypeSerializerSnapshot restoredPreviousSnapshot = serializerSpec.createSnapshot(); + restoredPreviousSnapshot.readSnapshot( + previousSnapshot.getCurrentVersion(), + new DataInputDeserializer(out.getCopyOfBuffer()), + SerializerSnapshotTest.class.getClassLoader()); + + return current.snapshotConfiguration().resolveSchemaCompatibility(restoredPreviousSnapshot); + } + + private static Stream serializerCascadingTypes() { + return serializerSpecs() + .flatMap( + serializerSpec -> + cascadingLogicalTypes() + .map(typeSpec -> typeSpec.toArguments(serializerSpec))); + } + + private static Stream> serializerSpecs() { + return Stream.of( + new SerializerSpec<>( + "ARRAY", + SerializerSnapshotTest::arraySerializer, + ArrayDataSerializer.ArrayDataSerializerSnapshot::new), + new SerializerSpec<>( + "ROW", + SerializerSnapshotTest::rowSerializer, + RowDataSerializer.RowDataSerializerSnapshot::new), + new SerializerSpec<>( + "MAP_KEY", + SerializerSnapshotTest::mapKeySerializer, + MapDataSerializer.BaseMapSerializerSnapshot::new), + new SerializerSpec<>( + "MAP_VALUE", + SerializerSnapshotTest::mapValueSerializer, + MapDataSerializer.BaseMapSerializerSnapshot::new)); + } + + private static RowDataSerializer rowSerializer(LogicalType... types) { + TypeSerializer[] fieldSerializers = new TypeSerializer[types.length]; + for (int i = 0; i < types.length; i++) { + fieldSerializers[i] = fieldSerializerFor(types[i]); + } + return new RowDataSerializer(types, fieldSerializers); + } + + private static ArrayDataSerializer arraySerializer(LogicalType elementType) { + return new ArrayDataSerializer(elementType, fieldSerializerFor(elementType)); + } + + private static MapDataSerializer mapKeySerializer(LogicalType keyType) { + LogicalType valueType = new IntType(false); + return new MapDataSerializer( + keyType, valueType, fieldSerializerFor(keyType), fieldSerializerFor(valueType)); + } + + private static MapDataSerializer mapValueSerializer(LogicalType valueType) { + LogicalType keyType = new VarCharType(false, 5); + return new MapDataSerializer( + keyType, valueType, fieldSerializerFor(keyType), fieldSerializerFor(valueType)); + } + + private static TypeSerializer fieldSerializerFor(LogicalType type) { + return InternalSerializers.create(type); + } + + private static Stream cascadingLogicalTypes() { + return Stream.of( + // Row with every nested nullability changed. + new CascadingLogicalTypeSpec(cascadingRowType(false), cascadingRowType(true)), + // Row with only leaf nullability changed. + new CascadingLogicalTypeSpec( + cascadingRowTypeWithLeafNullability(false), + cascadingRowTypeWithLeafNullability(true)), + // Array with cascading element type nullability changed. + new CascadingLogicalTypeSpec(cascadingArrayType(false), cascadingArrayType(true)), + // Map with cascading key and value type nullability changed. + new CascadingLogicalTypeSpec(cascadingMapType(false), cascadingMapType(true)), + // Multiset with cascading element type nullability changed. + new CascadingLogicalTypeSpec( + cascadingMultisetType(false), cascadingMultisetType(true)), + // Structured type with cascading attribute type nullability changed. + new CascadingLogicalTypeSpec( + cascadingStructuredType(false), cascadingStructuredType(true)), + // Distinct type with cascading source type nullability changed. + new CascadingLogicalTypeSpec( + cascadingDistinctType(false), cascadingDistinctType(true))); + } + + private static RowType nestedRowType(boolean isNullable) { + return RowType.of( + new LogicalType[] {new IntType(isNullable), new VarCharType(false, 100)}, + new String[] {"a", "b"}); + } + + private static RowType cascadingRowType(boolean isNullable) { + return new RowType( + isNullable, + Arrays.asList( + new RowField( + "arrayField", + new ArrayType( + isNullable, + new RowType( + isNullable, + Arrays.asList( + new RowField( + "element", + new IntType(isNullable)))))), + new RowField( + "mapField", + new MapType( + isNullable, + new VarCharType(isNullable, 5), + new RowType( + isNullable, + Arrays.asList( + new RowField( + "value", + new BigIntType(isNullable)))))), + new RowField( + "multisetField", + new MultisetType( + isNullable, + new RowType( + isNullable, + Arrays.asList( + new RowField( + "nested", + new BooleanType(isNullable)))))))); + } + + private static RowType cascadingRowTypeWithLeafNullability(boolean isNullable) { + return new RowType( + false, + Arrays.asList( + new RowField( + "arrayField", + new ArrayType( + false, + new RowType( + false, + Arrays.asList( + new RowField( + "element", + new IntType(isNullable)))))), + new RowField( + "mapField", + new MapType( + false, + new VarCharType(false, 5), + new RowType( + false, + Arrays.asList( + new RowField( + "value", + new BigIntType(isNullable)))))), + new RowField( + "multisetField", + new MultisetType( + false, + new RowType( + false, + Arrays.asList( + new RowField( + "nested", + new BooleanType(isNullable)))))))); + } + + private static ArrayType cascadingArrayType(boolean isNullable) { + return new ArrayType(isNullable, cascadingRowType(isNullable)); + } + + private static MapType cascadingMapType(boolean isNullable) { + return new MapType( + isNullable, new VarCharType(isNullable, 5), cascadingRowType(isNullable)); + } + + private static MultisetType cascadingMultisetType(boolean isNullable) { + return new MultisetType(isNullable, cascadingRowType(isNullable)); + } + + private static StructuredType cascadingStructuredType(boolean isNullable) { + return StructuredType.newBuilder( + ObjectIdentifier.of("cat", "db", "CascadingStructuredType")) + .attributes( + Arrays.asList( + new StructuredAttribute("field", cascadingArrayType(isNullable)))) + .setNullable(isNullable) + .build(); + } + + private static DistinctType cascadingDistinctType(boolean isNullable) { + return DistinctType.newBuilder( + ObjectIdentifier.of("cat", "db", "CascadingDistinctType"), + cascadingRowType(isNullable)) + .build(); + } + + private static final class SerializerSpec { + private final String name; + private final Function> serializerFactory; + private final Supplier> snapshotFactory; + + private SerializerSpec( + String name, + Function> serializerFactory, + Supplier> snapshotFactory) { + this.name = name; + this.serializerFactory = serializerFactory; + this.snapshotFactory = snapshotFactory; + } + + private TypeSerializer create(LogicalType type) { + return serializerFactory.apply(type); + } + + private TypeSerializerSnapshot createSnapshot() { + return snapshotFactory.get(); + } + + @Override + public String toString() { + return name; + } + } + + private static final class CascadingLogicalTypeSpec { + private final LogicalType previousType; + private final LogicalType currentType; + + private CascadingLogicalTypeSpec(LogicalType previousType, LogicalType currentType) { + this.previousType = previousType; + this.currentType = currentType; + } + + private Arguments toArguments(SerializerSpec serializerSpec) { + return Arguments.of(serializerSpec, previousType, currentType); + } + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeUtils.java index 98629b15fc177..d8678244cbfc7 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeUtils.java @@ -26,9 +26,13 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.DistinctType; import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.RowType.RowField; import org.apache.flink.table.types.logical.StructuredType; @@ -50,11 +54,48 @@ public final class LogicalTypeUtils { private static final String ATOMIC_FIELD_NAME = "f0"; private static final TimeAttributeRemover TIME_ATTRIBUTE_REMOVER = new TimeAttributeRemover(); + private static final NullabilityNormalizer NULLABILITY_NORMALIZER = new NullabilityNormalizer(); public static LogicalType removeTimeAttributes(LogicalType logicalType) { return logicalType.accept(TIME_ATTRIBUTE_REMOVER); } + public static LogicalType normalizeNullability(LogicalType logicalType) { + return logicalType.accept(NULLABILITY_NORMALIZER); + } + + /** + * Returns true when new types are structurally equal to old types ignoring nullability, and no + * type narrows from nullable to non-nullable. + */ + public static boolean areTypesCompatibleAfterNullabilityWidening( + LogicalType[] newTypes, LogicalType[] oldTypes) { + if (newTypes == oldTypes) { + return true; + } + if (newTypes == null || oldTypes == null || newTypes.length != oldTypes.length) { + return false; + } + for (int i = 0; i < newTypes.length; i++) { + if (!isTypeCompatibleAfterNullabilityWidening(newTypes[i], oldTypes[i])) { + return false; + } + } + return true; + } + + public static boolean isTypeCompatibleAfterNullabilityWidening( + LogicalType newType, LogicalType oldType) { + if (newType == oldType) { + return true; + } + if (newType == null || oldType == null) { + return false; + } + return normalizeNullability(newType).equals(normalizeNullability(oldType)) + && !hasNullabilityNarrowing(newType, oldType); + } + /** * Returns the conversion class for the given {@link LogicalType} that is used by the table * runtime as internal data structure. @@ -200,7 +241,79 @@ public LogicalType visit(LocalZonedTimestampType localZonedTimestampType) { } } - private LogicalTypeUtils() { - // no instantiation + private static boolean hasNullabilityNarrowing(LogicalType newType, LogicalType oldType) { + // reject narrowing: nullable -> non-nullable + if (oldType.isNullable() && !newType.isNullable()) { + return true; + } + + if (newType.is(LogicalTypeRoot.UNRESOLVED) || oldType.is(LogicalTypeRoot.UNRESOLVED)) { + return false; + } + + if (newType instanceof StructuredType && oldType instanceof StructuredType) { + StructuredType newStructuredType = (StructuredType) newType; + StructuredType oldStructuredType = (StructuredType) oldType; + if (newStructuredType.getSuperType().isPresent() + != oldStructuredType.getSuperType().isPresent()) { + return true; + } + if (newStructuredType.getSuperType().isPresent() + && hasNullabilityNarrowing( + newStructuredType.getSuperType().get(), + oldStructuredType.getSuperType().get())) { + return true; + } + } + + List newChildren = newType.getChildren(); + List oldChildren = oldType.getChildren(); + if (newChildren.size() != oldChildren.size()) { + return true; + } + for (int i = 0; i < newChildren.size(); i++) { + if (hasNullabilityNarrowing(newChildren.get(i), oldChildren.get(i))) { + return true; + } + } + return false; + } + + private static class NullabilityNormalizer extends LogicalTypeDuplicator { + + @Override + public LogicalType visit(ArrayType arrayType) { + return super.visit(arrayType).copy(true); + } + + @Override + public LogicalType visit(MultisetType multisetType) { + return super.visit(multisetType).copy(true); + } + + @Override + public LogicalType visit(MapType mapType) { + return super.visit(mapType).copy(true); + } + + @Override + public LogicalType visit(RowType rowType) { + return super.visit(rowType).copy(true); + } + + @Override + public LogicalType visit(DistinctType distinctType) { + return super.visit(distinctType).copy(true); + } + + @Override + public LogicalType visit(StructuredType structuredType) { + return super.visit(structuredType).copy(true); + } + + @Override + protected LogicalType defaultMethod(LogicalType logicalType) { + return logicalType.copy(true); + } } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/SerializerCheckUtils.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/SerializerCheckUtils.java new file mode 100644 index 0000000000000..dcb90739c51a5 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/SerializerCheckUtils.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.typeutils; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; + +/** Utils for serializer compatibility checks. */ +public class SerializerCheckUtils { + + public static boolean isSerializerCompatibleAfterNullabilityWidening( + TypeSerializer serializer, TypeSerializer previousSerializer) { + if (serializer.equals(previousSerializer)) { + return true; + } + + TypeSerializerSchemaCompatibility compatibility = + resolveSchemaCompatibility(serializer, previousSerializer); + return compatibility.isCompatibleAsIs(); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private static TypeSerializerSchemaCompatibility resolveSchemaCompatibility( + TypeSerializer serializer, TypeSerializer previousSerializer) { + TypeSerializerSnapshot previousSnapshot = previousSerializer.snapshotConfiguration(); + return serializer.snapshotConfiguration().resolveSchemaCompatibility(previousSnapshot); + } + + private SerializerCheckUtils() {} +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/SerializerSnapshotTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/SerializerSnapshotTest.java new file mode 100644 index 0000000000000..3ddb20c989174 --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/SerializerSnapshotTest.java @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.typeutils; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.RowType.RowField; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute; +import org.apache.flink.table.types.logical.VarCharType; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Arrays; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import static org.apache.flink.api.common.typeutils.TypeSerializerConditions.isCompatibleAsIs; +import static org.apache.flink.api.common.typeutils.TypeSerializerConditions.isIncompatible; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for serializer snapshots. */ +class SerializerSnapshotTest { + + @ParameterizedTest(name = "{0}") + @MethodSource("serializerSpecs") + void sameTypesIsCompatibleAsIs(SerializerSpec serializerSpec) throws Exception { + assertThat(resolveCompatibility(serializerSpec, new IntType(false), new IntType(false))) + .is(isCompatibleAsIs()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("serializerSpecs") + void differentLogicalTypeIsIncompatible(SerializerSpec serializerSpec) throws Exception { + assertThat(resolveCompatibility(serializerSpec, new IntType(false), new BigIntType(false))) + .is(isIncompatible()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("serializerSpecs") + void wideningNullabilityInNestedRowIsCompatible(SerializerSpec serializerSpec) + throws Exception { + assertThat(resolveCompatibility(serializerSpec, nestedRowType(false), nestedRowType(true))) + .is(isCompatibleAsIs()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("serializerSpecs") + void narrowingNullabilityInNestedRowIsIncompatible(SerializerSpec serializerSpec) + throws Exception { + assertThat(resolveCompatibility(serializerSpec, nestedRowType(true), nestedRowType(false))) + .is(isIncompatible()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("serializerCascadingTypes") + void wideningNullabilityInCascadingTypeIsCompatible( + SerializerSpec serializerSpec, LogicalType previousType, LogicalType currentType) + throws Exception { + assertThat(resolveCompatibility(serializerSpec, previousType, currentType)) + .is(isCompatibleAsIs()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("serializerCascadingTypes") + void narrowingNullabilityInCascadingTypeIsIncompatible( + SerializerSpec serializerSpec, LogicalType previousType, LogicalType currentType) + throws Exception { + assertThat(resolveCompatibility(serializerSpec, currentType, previousType)) + .is(isIncompatible()); + } + + // ------------------------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------------------------- + + /** + * Round-trips the previous serializer's snapshot through serialization and resolves + * compatibility against the current serializer's snapshot. + */ + private static TypeSerializerSchemaCompatibility resolveCompatibility( + SerializerSpec serializerSpec, LogicalType previousType, LogicalType currentType) + throws Exception { + TypeSerializer previous = serializerSpec.create(previousType); + TypeSerializer current = serializerSpec.create(currentType); + TypeSerializerSnapshot previousSnapshot = previous.snapshotConfiguration(); + + DataOutputSerializer out = new DataOutputSerializer(64); + previousSnapshot.writeSnapshot(out); + + TypeSerializerSnapshot restoredPreviousSnapshot = serializerSpec.createSnapshot(); + restoredPreviousSnapshot.readSnapshot( + previousSnapshot.getCurrentVersion(), + new DataInputDeserializer(out.getCopyOfBuffer()), + SerializerSnapshotTest.class.getClassLoader()); + + return current.snapshotConfiguration().resolveSchemaCompatibility(restoredPreviousSnapshot); + } + + private static Stream serializerCascadingTypes() { + return serializerSpecs() + .flatMap( + serializerSpec -> + cascadingLogicalTypes() + .map(typeSpec -> typeSpec.toArguments(serializerSpec))); + } + + private static Stream> serializerSpecs() { + return Stream.of( + new SerializerSpec<>( + "ARRAY", + ArrayDataSerializer::new, + ArrayDataSerializer.ArrayDataSerializerSnapshot::new), + new SerializerSpec<>( + "ROW", + RowDataSerializer::new, + RowDataSerializer.RowDataSerializerSnapshot::new), + new SerializerSpec<>( + "MAP_KEY", + SerializerSnapshotTest::mapKeySerializer, + MapDataSerializer.MapDataSerializerSnapshot::new), + new SerializerSpec<>( + "MAP_VALUE", + SerializerSnapshotTest::mapValueSerializer, + MapDataSerializer.MapDataSerializerSnapshot::new)); + } + + private static MapDataSerializer mapKeySerializer(LogicalType keyType) { + return new MapDataSerializer(keyType, new IntType(false)); + } + + private static MapDataSerializer mapValueSerializer(LogicalType valueType) { + return new MapDataSerializer(new VarCharType(false, 5), valueType); + } + + private static Stream cascadingLogicalTypes() { + return Stream.of( + // Row with every nested nullability changed. + new CascadingLogicalTypeSpec(cascadingRowType(false), cascadingRowType(true)), + // Row with only leaf nullability changed. + new CascadingLogicalTypeSpec( + cascadingRowTypeWithLeafNullability(false), + cascadingRowTypeWithLeafNullability(true)), + // Array with cascading element type nullability changed. + new CascadingLogicalTypeSpec(cascadingArrayType(false), cascadingArrayType(true)), + // Map with cascading key and value type nullability changed. + new CascadingLogicalTypeSpec(cascadingMapType(false), cascadingMapType(true)), + // Multiset with cascading element type nullability changed. + new CascadingLogicalTypeSpec( + cascadingMultisetType(false), cascadingMultisetType(true)), + // Structured type with cascading attribute type nullability changed. + new CascadingLogicalTypeSpec( + cascadingStructuredType(false), cascadingStructuredType(true)), + // Distinct type with cascading source type nullability changed. + new CascadingLogicalTypeSpec( + cascadingDistinctType(false), cascadingDistinctType(true))); + } + + private static RowType nestedRowType(boolean isNullable) { + return RowType.of( + new LogicalType[] {new IntType(isNullable), new VarCharType(false, 100)}, + new String[] {"a", "b"}); + } + + private static RowType cascadingRowType(boolean isNullable) { + return new RowType( + isNullable, + Arrays.asList( + new RowField( + "arrayField", + new ArrayType( + isNullable, + new RowType( + isNullable, + Arrays.asList( + new RowField( + "element", + new IntType(isNullable)))))), + new RowField( + "mapField", + new MapType( + isNullable, + new VarCharType(isNullable, 5), + new RowType( + isNullable, + Arrays.asList( + new RowField( + "value", + new BigIntType(isNullable)))))), + new RowField( + "multisetField", + new MultisetType( + isNullable, + new RowType( + isNullable, + Arrays.asList( + new RowField( + "nested", + new BooleanType(isNullable)))))))); + } + + private static RowType cascadingRowTypeWithLeafNullability(boolean isNullable) { + return new RowType( + false, + Arrays.asList( + new RowField( + "arrayField", + new ArrayType( + false, + new RowType( + false, + Arrays.asList( + new RowField( + "element", + new IntType(isNullable)))))), + new RowField( + "mapField", + new MapType( + false, + new VarCharType(false, 5), + new RowType( + false, + Arrays.asList( + new RowField( + "value", + new BigIntType(isNullable)))))), + new RowField( + "multisetField", + new MultisetType( + false, + new RowType( + false, + Arrays.asList( + new RowField( + "nested", + new BooleanType(isNullable)))))))); + } + + private static ArrayType cascadingArrayType(boolean isNullable) { + return new ArrayType(isNullable, cascadingRowType(isNullable)); + } + + private static MapType cascadingMapType(boolean isNullable) { + return new MapType( + isNullable, new VarCharType(isNullable, 5), cascadingRowType(isNullable)); + } + + private static MultisetType cascadingMultisetType(boolean isNullable) { + return new MultisetType(isNullable, cascadingRowType(isNullable)); + } + + private static StructuredType cascadingStructuredType(boolean isNullable) { + return StructuredType.newBuilder( + ObjectIdentifier.of("cat", "db", "CascadingStructuredType")) + .attributes( + Arrays.asList( + new StructuredAttribute("field", cascadingArrayType(isNullable)))) + .setNullable(isNullable) + .build(); + } + + private static DistinctType cascadingDistinctType(boolean isNullable) { + return DistinctType.newBuilder( + ObjectIdentifier.of("cat", "db", "CascadingDistinctType"), + cascadingRowType(isNullable)) + .build(); + } + + private static final class SerializerSpec { + private final String name; + private final Function> serializerFactory; + private final Supplier> snapshotFactory; + + private SerializerSpec( + String name, + Function> serializerFactory, + Supplier> snapshotFactory) { + this.name = name; + this.serializerFactory = serializerFactory; + this.snapshotFactory = snapshotFactory; + } + + private TypeSerializer create(LogicalType type) { + return serializerFactory.apply(type); + } + + private TypeSerializerSnapshot createSnapshot() { + return snapshotFactory.get(); + } + + @Override + public String toString() { + return name; + } + } + + private static final class CascadingLogicalTypeSpec { + private final LogicalType previousType; + private final LogicalType currentType; + + private CascadingLogicalTypeSpec(LogicalType previousType, LogicalType currentType) { + this.previousType = previousType; + this.currentType = currentType; + } + + private Arguments toArguments(SerializerSpec serializerSpec) { + return Arguments.of(serializerSpec, previousType, currentType); + } + } +} diff --git a/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java index bf9783789fe28..7af53425ffd40 100644 --- a/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java +++ b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java @@ -43,6 +43,9 @@ import java.lang.reflect.Array; import java.util.Arrays; +import static org.apache.flink.table.runtime.typeutils.SerializerCheckUtils.isSerializerCompatibleAfterNullabilityWidening; +import static org.apache.flink.table.types.logical.utils.LogicalTypeUtils.isTypeCompatibleAfterNullabilityWidening; + /** Serializer for {@link ArrayData}. */ @Internal public class ArrayDataSerializer extends TypeSerializer { @@ -333,12 +336,14 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( ArrayDataSerializerSnapshot oldArrayDataSerializerSnapshot = (ArrayDataSerializerSnapshot) oldSerializerSnapshot; - if (!eleType.equals(oldArrayDataSerializerSnapshot.eleType) - || !eleSer.equals(oldArrayDataSerializerSnapshot.eleSer)) { + + if (!isTypeCompatibleAfterNullabilityWidening( + eleType, oldArrayDataSerializerSnapshot.eleType) + || !isSerializerCompatibleAfterNullabilityWidening( + eleSer, oldArrayDataSerializerSnapshot.eleSer)) { return TypeSerializerSchemaCompatibility.incompatible(); - } else { - return TypeSerializerSchemaCompatibility.compatibleAsIs(); } + return TypeSerializerSchemaCompatibility.compatibleAsIs(); } } } diff --git a/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java index cd58a4bc0a3f2..f37c58fffad08 100644 --- a/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java +++ b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java @@ -40,6 +40,9 @@ import java.io.IOException; +import static org.apache.flink.table.runtime.typeutils.SerializerCheckUtils.isSerializerCompatibleAfterNullabilityWidening; +import static org.apache.flink.table.types.logical.utils.LogicalTypeUtils.isTypeCompatibleAfterNullabilityWidening; + /** Serializer for {@link MapData}. */ @Internal public class MapDataSerializer extends TypeSerializer { @@ -323,14 +326,18 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( MapDataSerializerSnapshot previousMapDataSerializerSnapshot = (MapDataSerializerSnapshot) oldSerializerSnapshot; - if (!keyType.equals(previousMapDataSerializerSnapshot.keyType) - || !valueType.equals(previousMapDataSerializerSnapshot.valueType) - || !keySerializer.equals(previousMapDataSerializerSnapshot.keySerializer) - || !valueSerializer.equals(previousMapDataSerializerSnapshot.valueSerializer)) { + + if (!isTypeCompatibleAfterNullabilityWidening( + keyType, previousMapDataSerializerSnapshot.keyType) + || !isTypeCompatibleAfterNullabilityWidening( + valueType, previousMapDataSerializerSnapshot.valueType) + || !isSerializerCompatibleAfterNullabilityWidening( + keySerializer, previousMapDataSerializerSnapshot.keySerializer) + || !isSerializerCompatibleAfterNullabilityWidening( + valueSerializer, previousMapDataSerializerSnapshot.valueSerializer)) { return TypeSerializerSchemaCompatibility.incompatible(); - } else { - return TypeSerializerSchemaCompatibility.compatibleAsIs(); } + return TypeSerializerSchemaCompatibility.compatibleAsIs(); } } } diff --git a/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java index 7a634f67efe04..585406f310ff4 100644 --- a/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java +++ b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java @@ -44,6 +44,8 @@ import java.util.Arrays; import java.util.stream.IntStream; +import static org.apache.flink.table.types.logical.utils.LogicalTypeUtils.areTypesCompatibleAfterNullabilityWidening; + /** Serializer for {@link RowData}. */ @Internal public class RowDataSerializer extends AbstractRowDataSerializer { @@ -344,7 +346,9 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( RowDataSerializerSnapshot oldRowDataSerializerSnapshot = (RowDataSerializerSnapshot) oldSerializerSnapshot; - if (!Arrays.equals(types, oldRowDataSerializerSnapshot.types)) { + // Allow NOT NULL -> NULL widening; reject NULL -> NOT NULL narrowing. + if (!areTypesCompatibleAfterNullabilityWidening( + types, oldRowDataSerializerSnapshot.types)) { return TypeSerializerSchemaCompatibility.incompatible(); }