From d922445285acbcdc15f3af646f9b70276e9d5dc8 Mon Sep 17 00:00:00 2001 From: Au_Miner <358671982@qq.com> Date: Thu, 30 Apr 2026 11:12:15 +0800 Subject: [PATCH 1/2] [FLINK-37240][table] RowDataSerializer relaxes NOT NULL to NULL as a state compatibility change. --- .../serializers/python/RowDataSerializer.java | 32 +++- .../python/RowDataSerializerSnapshotTest.java | 142 +++++++++++++++++ .../RowDataSerializerSnapshotTest.java | 147 ++++++++++++++++++ .../runtime/typeutils/RowDataSerializer.java | 31 +++- 4 files changed, 349 insertions(+), 3 deletions(-) create mode 100644 flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializerSnapshotTest.java create mode 100644 flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerSnapshotTest.java diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java index 5b0d41141e01d..faafe24145cd7 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java @@ -35,7 +35,6 @@ import org.apache.flink.util.InstantiationUtil; import java.io.IOException; -import java.util.Arrays; import java.util.stream.IntStream; import static org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask; @@ -205,7 +204,9 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( RowDataSerializerSnapshot oldRowDataSerializerSnapshot = (RowDataSerializerSnapshot) oldSerializerSnapshot; - if (!Arrays.equals(types, oldRowDataSerializerSnapshot.types)) { + // Allow NOT NULL -> NULL widening; reject NULL -> NOT NULL narrowing. + if (!typesAreCompatibleAfterNullabilityWidening( + types, oldRowDataSerializerSnapshot.types)) { return TypeSerializerSchemaCompatibility.incompatible(); } @@ -225,5 +226,32 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( return intermediateResult.getFinalResult(); } + + /** + * Returns true when new field types are structurally equal to old ones ignoring top-level + * nullability, and no field narrows from nullable to non-nullable. + */ + private static boolean typesAreCompatibleAfterNullabilityWidening( + LogicalType[] newTypes, LogicalType[] oldTypes) { + if (newTypes == oldTypes) { + return true; + } + if (newTypes == null || oldTypes == null || newTypes.length != oldTypes.length) { + return false; + } + for (int i = 0; i < newTypes.length; i++) { + LogicalType newType = newTypes[i]; + LogicalType oldType = oldTypes[i]; + // structurally equal except (possibly) for top-level nullability + if (!newType.copy(true).equals(oldType.copy(true))) { + return false; + } + // reject narrowing: nullable -> non-nullable + if (oldType.isNullable() && !newType.isNullable()) { + return false; + } + } + return true; + } } } diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializerSnapshotTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializerSnapshotTest.java new file mode 100644 index 0000000000000..4e9e274dd68a6 --- /dev/null +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializerSnapshotTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.typeutils.serializers.python; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.RowDataSerializerSnapshot; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.VarCharType; + +import org.junit.jupiter.api.Test; + +import static org.apache.flink.api.common.typeutils.TypeSerializerConditions.isCompatibleAsIs; +import static org.apache.flink.api.common.typeutils.TypeSerializerConditions.isIncompatible; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link RowDataSerializerSnapshot}. */ +class RowDataSerializerSnapshotTest { + + @Test + void sameTypesIsCompatibleAsIs() throws Exception { + RowDataSerializer previous = serializerOf(new IntType(false), new VarCharType(false, 100)); + RowDataSerializer current = serializerOf(new IntType(false), new VarCharType(false, 100)); + + assertThat(resolveCompatibility(previous, current)).is(isCompatibleAsIs()); + } + + /** Widening field nullability (NOT NULL -> NULL) should be compatible as-is. */ + @Test + void wideningFieldNullabilityIsCompatible() throws Exception { + RowDataSerializer previous = serializerOf(new IntType(false), new VarCharType(false, 100)); + RowDataSerializer current = serializerOf(new IntType(true), new VarCharType(true, 100)); + + assertThat(resolveCompatibility(previous, current)).is(isCompatibleAsIs()); + } + + /** Narrowing field nullability (NULL -> NOT NULL) should be incompatible. */ + @Test + void narrowingFieldNullabilityIsIncompatible() throws Exception { + RowDataSerializer previous = serializerOf(new IntType(true), new VarCharType(true, 100)); + RowDataSerializer current = serializerOf(new IntType(false), new VarCharType(false, 100)); + + assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); + } + + /** Mixed nullability changes should be incompatible when any field narrows. */ + @Test + void mixedNullabilityChangeIsIncompatibleWhenAnyFieldNarrows() throws Exception { + RowDataSerializer previous = serializerOf(new IntType(false), new VarCharType(true, 100)); + // INT widened, VARCHAR narrowed + RowDataSerializer current = serializerOf(new IntType(true), new VarCharType(false, 100)); + + assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); + } + + @Test + void differentFieldCountIsIncompatible() throws Exception { + RowDataSerializer previous = serializerOf(new IntType(false)); + RowDataSerializer current = serializerOf(new IntType(false), new VarCharType(false, 100)); + + assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); + } + + @Test + void differentFieldTypeIsIncompatible() throws Exception { + RowDataSerializer previous = serializerOf(new IntType(false)); + RowDataSerializer current = serializerOf(new BigIntType(false)); + + assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); + } + + // ------------------------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------------------------- + + private static RowDataSerializer serializerOf(LogicalType... types) { + TypeSerializer[] fieldSerializers = new TypeSerializer[types.length]; + for (int i = 0; i < types.length; i++) { + fieldSerializers[i] = fieldSerializerFor(types[i]); + } + return new RowDataSerializer(types, fieldSerializers); + } + + private static TypeSerializer fieldSerializerFor(LogicalType type) { + switch (type.getTypeRoot()) { + case INTEGER: + return IntSerializer.INSTANCE; + case BIGINT: + return LongSerializer.INSTANCE; + case VARCHAR: + case CHAR: + return StringSerializer.INSTANCE; + default: + throw new IllegalArgumentException("Unsupported type for test: " + type); + } + } + + /** + * Round-trips the previous serializer's snapshot through serialization (mirroring what happens + * on checkpoint restore) and resolves compatibility against the current serializer's snapshot. + */ + private static TypeSerializerSchemaCompatibility resolveCompatibility( + RowDataSerializer previous, RowDataSerializer current) throws Exception { + TypeSerializerSnapshot previousSnapshot = previous.snapshotConfiguration(); + + DataOutputSerializer out = new DataOutputSerializer(64); + previousSnapshot.writeSnapshot(out); + + TypeSerializerSnapshot restoredPreviousSnapshot = new RowDataSerializerSnapshot(); + restoredPreviousSnapshot.readSnapshot( + previousSnapshot.getCurrentVersion(), + new DataInputDeserializer(out.getCopyOfBuffer()), + RowDataSerializerSnapshotTest.class.getClassLoader()); + + return current.snapshotConfiguration().resolveSchemaCompatibility(restoredPreviousSnapshot); + } +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerSnapshotTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerSnapshotTest.java new file mode 100644 index 0000000000000..513801ac98a2d --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerSnapshotTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.typeutils; + +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer.RowDataSerializerSnapshot; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; + +import org.junit.jupiter.api.Test; + +import static org.apache.flink.api.common.typeutils.TypeSerializerConditions.isCompatibleAsIs; +import static org.apache.flink.api.common.typeutils.TypeSerializerConditions.isIncompatible; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link RowDataSerializerSnapshot}. */ +class RowDataSerializerSnapshotTest { + + @Test + void sameTypesIsCompatibleAsIs() throws Exception { + RowDataSerializer previous = + new RowDataSerializer(new IntType(false), new VarCharType(false, 100)); + RowDataSerializer current = + new RowDataSerializer(new IntType(false), new VarCharType(false, 100)); + + assertThat(resolveCompatibility(previous, current)).is(isCompatibleAsIs()); + } + + /** Widening field nullability (NOT NULL -> NULL) should be compatible as-is. */ + @Test + void wideningFieldNullabilityIsCompatible() throws Exception { + RowDataSerializer previous = + new RowDataSerializer(new IntType(false), new VarCharType(false, 100)); + RowDataSerializer current = + new RowDataSerializer(new IntType(true), new VarCharType(true, 100)); + + assertThat(resolveCompatibility(previous, current)).is(isCompatibleAsIs()); + } + + /** Narrowing field nullability (NULL -> NOT NULL) should be incompatible. */ + @Test + void narrowingFieldNullabilityIsIncompatible() throws Exception { + RowDataSerializer previous = + new RowDataSerializer(new IntType(true), new VarCharType(true, 100)); + RowDataSerializer current = + new RowDataSerializer(new IntType(false), new VarCharType(false, 100)); + + assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); + } + + /** Mixed nullability changes should be incompatible when any field narrows. */ + @Test + void mixedNullabilityChangeIsIncompatibleWhenAnyFieldNarrows() throws Exception { + RowDataSerializer previous = + new RowDataSerializer(new IntType(false), new VarCharType(true, 100)); + // INT widened, VARCHAR narrowed + RowDataSerializer current = + new RowDataSerializer(new IntType(true), new VarCharType(false, 100)); + + assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); + } + + @Test + void differentFieldCountIsIncompatible() throws Exception { + RowDataSerializer previous = new RowDataSerializer(new IntType(false)); + RowDataSerializer current = + new RowDataSerializer(new IntType(false), new VarCharType(false, 100)); + + assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); + } + + @Test + void differentFieldTypeIsIncompatible() throws Exception { + RowDataSerializer previous = new RowDataSerializer(new IntType(false)); + RowDataSerializer current = new RowDataSerializer(new BigIntType(false)); + + assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); + } + + /** + * Nullability changes in nested row fields (even widening) are not supported and should be + * incompatible. + */ + @Test + void nullabilityChangeInNestedRowFieldIsIncompatible() throws Exception { + RowType nestedNotNull = + RowType.of( + new LogicalType[] {new IntType(false), new VarCharType(false, 100)}, + new String[] {"a", "b"}); + RowType nestedNullable = + RowType.of( + new LogicalType[] {new IntType(true), new VarCharType(false, 100)}, + new String[] {"a", "b"}); + + RowDataSerializer previous = new RowDataSerializer(new LogicalType[] {nestedNotNull}); + RowDataSerializer current = new RowDataSerializer(new LogicalType[] {nestedNullable}); + + assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); + } + + // ------------------------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------------------------- + + /** + * Round-trips the previous serializer's snapshot through serialization (mirroring what happens + * on checkpoint restore) and resolves compatibility against the current serializer's snapshot. + */ + private static TypeSerializerSchemaCompatibility resolveCompatibility( + RowDataSerializer previous, RowDataSerializer current) throws Exception { + TypeSerializerSnapshot previousSnapshot = previous.snapshotConfiguration(); + + DataOutputSerializer out = new DataOutputSerializer(64); + previousSnapshot.writeSnapshot(out); + + TypeSerializerSnapshot restoredPreviousSnapshot = new RowDataSerializerSnapshot(); + restoredPreviousSnapshot.readSnapshot( + previousSnapshot.getCurrentVersion(), + new DataInputDeserializer(out.getCopyOfBuffer()), + RowDataSerializerSnapshotTest.class.getClassLoader()); + + return current.snapshotConfiguration().resolveSchemaCompatibility(restoredPreviousSnapshot); + } +} diff --git a/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java index 7a634f67efe04..8d419f15212f4 100644 --- a/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java +++ b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java @@ -344,7 +344,9 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( RowDataSerializerSnapshot oldRowDataSerializerSnapshot = (RowDataSerializerSnapshot) oldSerializerSnapshot; - if (!Arrays.equals(types, oldRowDataSerializerSnapshot.types)) { + // Allow NOT NULL -> NULL widening; reject NULL -> NOT NULL narrowing. + if (!typesAreCompatibleAfterNullabilityWidening( + types, oldRowDataSerializerSnapshot.types)) { return TypeSerializerSchemaCompatibility.incompatible(); } @@ -364,5 +366,32 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( return intermediateResult.getFinalResult(); } + + /** + * Returns true when new field types are structurally equal to old ones ignoring top-level + * nullability, and no field narrows from nullable to non-nullable. + */ + private static boolean typesAreCompatibleAfterNullabilityWidening( + LogicalType[] newTypes, LogicalType[] oldTypes) { + if (newTypes == oldTypes) { + return true; + } + if (newTypes == null || oldTypes == null || newTypes.length != oldTypes.length) { + return false; + } + for (int i = 0; i < newTypes.length; i++) { + LogicalType newType = newTypes[i]; + LogicalType oldType = oldTypes[i]; + // structurally equal except (possibly) for top-level nullability + if (!newType.copy(true).equals(oldType.copy(true))) { + return false; + } + // reject narrowing: nullable -> non-nullable + if (oldType.isNullable() && !newType.isNullable()) { + return false; + } + } + return true; + } } } From e0dbacf5c055bc38bbc579f15ba68fec3ac23932 Mon Sep 17 00:00:00 2001 From: Au_Miner <358671982@qq.com> Date: Fri, 8 May 2026 16:09:48 +0800 Subject: [PATCH 2/2] solve comments --- .../python/ArrayDataSerializer.java | 13 +- .../serializers/python/MapDataSerializer.java | 19 +- .../serializers/python/RowDataSerializer.java | 35 +- .../python/RowDataSerializerSnapshotTest.java | 142 ------- .../python/SerializerSnapshotTest.java | 363 ++++++++++++++++++ .../types/logical/utils/LogicalTypeUtils.java | 117 +++++- .../typeutils/SerializerCheckUtils.java | 47 +++ .../RowDataSerializerSnapshotTest.java | 147 ------- .../typeutils/SerializerSnapshotTest.java | 342 +++++++++++++++++ .../typeutils/ArrayDataSerializer.java | 13 +- .../runtime/typeutils/MapDataSerializer.java | 19 +- .../runtime/typeutils/RowDataSerializer.java | 31 +- 12 files changed, 918 insertions(+), 370 deletions(-) delete mode 100644 flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializerSnapshotTest.java create mode 100644 flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/serializers/python/SerializerSnapshotTest.java create mode 100644 flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/SerializerCheckUtils.java delete mode 100644 flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerSnapshotTest.java create mode 100644 flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/SerializerSnapshotTest.java diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/ArrayDataSerializer.java b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/ArrayDataSerializer.java index 0c5eea84aea5e..1f5aed4242958 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/ArrayDataSerializer.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/ArrayDataSerializer.java @@ -35,6 +35,9 @@ import java.io.IOException; +import static org.apache.flink.table.runtime.typeutils.SerializerCheckUtils.isSerializerCompatibleAfterNullabilityWidening; +import static org.apache.flink.table.types.logical.utils.LogicalTypeUtils.isTypeCompatibleAfterNullabilityWidening; + /** * A {@link TypeSerializer} for {@link ArrayData}. It should be noted that the header will not be * encoded. Currently Python doesn't support BinaryArrayData natively, so we can't use @@ -182,13 +185,15 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( ArrayDataSerializerSnapshot oldArrayDataSerializerSnapshot = (ArrayDataSerializerSnapshot) oldSerializerSnapshot; - if (!elementType.equals(oldArrayDataSerializerSnapshot.elementType) - || !elementTypeSerializer.equals( + + if (!isTypeCompatibleAfterNullabilityWidening( + elementType, oldArrayDataSerializerSnapshot.elementType) + || !isSerializerCompatibleAfterNullabilityWidening( + elementTypeSerializer, oldArrayDataSerializerSnapshot.elementTypeSerializer)) { return TypeSerializerSchemaCompatibility.incompatible(); - } else { - return TypeSerializerSchemaCompatibility.compatibleAsIs(); } + return TypeSerializerSchemaCompatibility.compatibleAsIs(); } } } diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/MapDataSerializer.java b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/MapDataSerializer.java index 4e42e737c1f53..b76bd10d1ea52 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/MapDataSerializer.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/MapDataSerializer.java @@ -38,6 +38,9 @@ import java.io.IOException; +import static org.apache.flink.table.runtime.typeutils.SerializerCheckUtils.isSerializerCompatibleAfterNullabilityWidening; +import static org.apache.flink.table.types.logical.utils.LogicalTypeUtils.isTypeCompatibleAfterNullabilityWidening; + /** * A {@link TypeSerializer} for {@link MapData}. It should be noted that the header will not be * encoded. Currently Python doesn't support BinaryMapData natively, so we can't use @@ -237,14 +240,18 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( BaseMapSerializerSnapshot oldBaseMapDataSerializerSnapshot = (BaseMapSerializerSnapshot) oldSerializerSnapshot; - if (!keyType.equals(oldBaseMapDataSerializerSnapshot.keyType) - || !valueType.equals(oldBaseMapDataSerializerSnapshot.valueType) - || !keySerializer.equals(oldBaseMapDataSerializerSnapshot.keySerializer) - || !valueSerializer.equals(oldBaseMapDataSerializerSnapshot.valueSerializer)) { + + if (!isTypeCompatibleAfterNullabilityWidening( + keyType, oldBaseMapDataSerializerSnapshot.keyType) + || !isTypeCompatibleAfterNullabilityWidening( + valueType, oldBaseMapDataSerializerSnapshot.valueType) + || !isSerializerCompatibleAfterNullabilityWidening( + keySerializer, oldBaseMapDataSerializerSnapshot.keySerializer) + || !isSerializerCompatibleAfterNullabilityWidening( + valueSerializer, oldBaseMapDataSerializerSnapshot.valueSerializer)) { return TypeSerializerSchemaCompatibility.incompatible(); - } else { - return TypeSerializerSchemaCompatibility.compatibleAsIs(); } + return TypeSerializerSchemaCompatibility.compatibleAsIs(); } } } diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java index faafe24145cd7..d627f44f1269a 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java @@ -39,6 +39,7 @@ import static org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask; import static org.apache.flink.api.java.typeutils.runtime.MaskUtils.writeMask; +import static org.apache.flink.table.types.logical.utils.LogicalTypeUtils.areTypesCompatibleAfterNullabilityWidening; /** * A {@link TypeSerializer} for {@link RowData}. It should be noted that the row kind will be @@ -198,14 +199,13 @@ public RowDataSerializer restoreSerializer() { @Override public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( TypeSerializerSnapshot oldSerializerSnapshot) { - if (!(oldSerializerSnapshot instanceof RowDataSerializerSnapshot)) { + if (!(oldSerializerSnapshot + instanceof RowDataSerializerSnapshot oldRowDataSerializerSnapshot)) { return TypeSerializerSchemaCompatibility.incompatible(); } - RowDataSerializerSnapshot oldRowDataSerializerSnapshot = - (RowDataSerializerSnapshot) oldSerializerSnapshot; // Allow NOT NULL -> NULL widening; reject NULL -> NOT NULL narrowing. - if (!typesAreCompatibleAfterNullabilityWidening( + if (!areTypesCompatibleAfterNullabilityWidening( types, oldRowDataSerializerSnapshot.types)) { return TypeSerializerSchemaCompatibility.incompatible(); } @@ -226,32 +226,5 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( return intermediateResult.getFinalResult(); } - - /** - * Returns true when new field types are structurally equal to old ones ignoring top-level - * nullability, and no field narrows from nullable to non-nullable. - */ - private static boolean typesAreCompatibleAfterNullabilityWidening( - LogicalType[] newTypes, LogicalType[] oldTypes) { - if (newTypes == oldTypes) { - return true; - } - if (newTypes == null || oldTypes == null || newTypes.length != oldTypes.length) { - return false; - } - for (int i = 0; i < newTypes.length; i++) { - LogicalType newType = newTypes[i]; - LogicalType oldType = oldTypes[i]; - // structurally equal except (possibly) for top-level nullability - if (!newType.copy(true).equals(oldType.copy(true))) { - return false; - } - // reject narrowing: nullable -> non-nullable - if (oldType.isNullable() && !newType.isNullable()) { - return false; - } - } - return true; - } } } diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializerSnapshotTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializerSnapshotTest.java deleted file mode 100644 index 4e9e274dd68a6..0000000000000 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializerSnapshotTest.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime.typeutils.serializers.python; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.core.memory.DataInputDeserializer; -import org.apache.flink.core.memory.DataOutputSerializer; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.RowDataSerializerSnapshot; -import org.apache.flink.table.types.logical.BigIntType; -import org.apache.flink.table.types.logical.IntType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.VarCharType; - -import org.junit.jupiter.api.Test; - -import static org.apache.flink.api.common.typeutils.TypeSerializerConditions.isCompatibleAsIs; -import static org.apache.flink.api.common.typeutils.TypeSerializerConditions.isIncompatible; -import static org.assertj.core.api.Assertions.assertThat; - -/** Tests for {@link RowDataSerializerSnapshot}. */ -class RowDataSerializerSnapshotTest { - - @Test - void sameTypesIsCompatibleAsIs() throws Exception { - RowDataSerializer previous = serializerOf(new IntType(false), new VarCharType(false, 100)); - RowDataSerializer current = serializerOf(new IntType(false), new VarCharType(false, 100)); - - assertThat(resolveCompatibility(previous, current)).is(isCompatibleAsIs()); - } - - /** Widening field nullability (NOT NULL -> NULL) should be compatible as-is. */ - @Test - void wideningFieldNullabilityIsCompatible() throws Exception { - RowDataSerializer previous = serializerOf(new IntType(false), new VarCharType(false, 100)); - RowDataSerializer current = serializerOf(new IntType(true), new VarCharType(true, 100)); - - assertThat(resolveCompatibility(previous, current)).is(isCompatibleAsIs()); - } - - /** Narrowing field nullability (NULL -> NOT NULL) should be incompatible. */ - @Test - void narrowingFieldNullabilityIsIncompatible() throws Exception { - RowDataSerializer previous = serializerOf(new IntType(true), new VarCharType(true, 100)); - RowDataSerializer current = serializerOf(new IntType(false), new VarCharType(false, 100)); - - assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); - } - - /** Mixed nullability changes should be incompatible when any field narrows. */ - @Test - void mixedNullabilityChangeIsIncompatibleWhenAnyFieldNarrows() throws Exception { - RowDataSerializer previous = serializerOf(new IntType(false), new VarCharType(true, 100)); - // INT widened, VARCHAR narrowed - RowDataSerializer current = serializerOf(new IntType(true), new VarCharType(false, 100)); - - assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); - } - - @Test - void differentFieldCountIsIncompatible() throws Exception { - RowDataSerializer previous = serializerOf(new IntType(false)); - RowDataSerializer current = serializerOf(new IntType(false), new VarCharType(false, 100)); - - assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); - } - - @Test - void differentFieldTypeIsIncompatible() throws Exception { - RowDataSerializer previous = serializerOf(new IntType(false)); - RowDataSerializer current = serializerOf(new BigIntType(false)); - - assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); - } - - // ------------------------------------------------------------------------------------------- - // Helpers - // ------------------------------------------------------------------------------------------- - - private static RowDataSerializer serializerOf(LogicalType... types) { - TypeSerializer[] fieldSerializers = new TypeSerializer[types.length]; - for (int i = 0; i < types.length; i++) { - fieldSerializers[i] = fieldSerializerFor(types[i]); - } - return new RowDataSerializer(types, fieldSerializers); - } - - private static TypeSerializer fieldSerializerFor(LogicalType type) { - switch (type.getTypeRoot()) { - case INTEGER: - return IntSerializer.INSTANCE; - case BIGINT: - return LongSerializer.INSTANCE; - case VARCHAR: - case CHAR: - return StringSerializer.INSTANCE; - default: - throw new IllegalArgumentException("Unsupported type for test: " + type); - } - } - - /** - * Round-trips the previous serializer's snapshot through serialization (mirroring what happens - * on checkpoint restore) and resolves compatibility against the current serializer's snapshot. - */ - private static TypeSerializerSchemaCompatibility resolveCompatibility( - RowDataSerializer previous, RowDataSerializer current) throws Exception { - TypeSerializerSnapshot previousSnapshot = previous.snapshotConfiguration(); - - DataOutputSerializer out = new DataOutputSerializer(64); - previousSnapshot.writeSnapshot(out); - - TypeSerializerSnapshot restoredPreviousSnapshot = new RowDataSerializerSnapshot(); - restoredPreviousSnapshot.readSnapshot( - previousSnapshot.getCurrentVersion(), - new DataInputDeserializer(out.getCopyOfBuffer()), - RowDataSerializerSnapshotTest.class.getClassLoader()); - - return current.snapshotConfiguration().resolveSchemaCompatibility(restoredPreviousSnapshot); - } -} diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/serializers/python/SerializerSnapshotTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/serializers/python/SerializerSnapshotTest.java new file mode 100644 index 0000000000000..a040aa6bae162 --- /dev/null +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/serializers/python/SerializerSnapshotTest.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.typeutils.serializers.python; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.runtime.typeutils.InternalSerializers; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.RowType.RowField; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute; +import org.apache.flink.table.types.logical.VarCharType; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Arrays; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import static org.apache.flink.api.common.typeutils.TypeSerializerConditions.isCompatibleAsIs; +import static org.apache.flink.api.common.typeutils.TypeSerializerConditions.isIncompatible; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for serializer snapshots. */ +class SerializerSnapshotTest { + + @ParameterizedTest(name = "{0}") + @MethodSource("serializerSpecs") + void sameTypesIsCompatibleAsIs(SerializerSpec serializerSpec) throws Exception { + assertThat(resolveCompatibility(serializerSpec, new IntType(false), new IntType(false))) + .is(isCompatibleAsIs()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("serializerSpecs") + void differentLogicalTypeIsIncompatible(SerializerSpec serializerSpec) throws Exception { + assertThat(resolveCompatibility(serializerSpec, new IntType(false), new BigIntType(false))) + .is(isIncompatible()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("serializerSpecs") + void wideningNullabilityInNestedRowIsCompatible(SerializerSpec serializerSpec) + throws Exception { + assertThat(resolveCompatibility(serializerSpec, nestedRowType(false), nestedRowType(true))) + .is(isCompatibleAsIs()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("serializerSpecs") + void narrowingNullabilityInNestedRowIsIncompatible(SerializerSpec serializerSpec) + throws Exception { + assertThat(resolveCompatibility(serializerSpec, nestedRowType(true), nestedRowType(false))) + .is(isIncompatible()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("serializerCascadingTypes") + void wideningNullabilityInCascadingTypeIsCompatible( + SerializerSpec serializerSpec, LogicalType previousType, LogicalType currentType) + throws Exception { + assertThat(resolveCompatibility(serializerSpec, previousType, currentType)) + .is(isCompatibleAsIs()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("serializerCascadingTypes") + void narrowingNullabilityInCascadingTypeIsIncompatible( + SerializerSpec serializerSpec, LogicalType previousType, LogicalType currentType) + throws Exception { + assertThat(resolveCompatibility(serializerSpec, currentType, previousType)) + .is(isIncompatible()); + } + + // ------------------------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------------------------- + + /** + * Round-trips the previous serializer's snapshot through serialization and resolves + * compatibility against the current serializer's snapshot. + */ + private static TypeSerializerSchemaCompatibility resolveCompatibility( + SerializerSpec serializerSpec, LogicalType previousType, LogicalType currentType) + throws Exception { + TypeSerializer previous = serializerSpec.create(previousType); + TypeSerializer current = serializerSpec.create(currentType); + TypeSerializerSnapshot previousSnapshot = previous.snapshotConfiguration(); + + DataOutputSerializer out = new DataOutputSerializer(64); + previousSnapshot.writeSnapshot(out); + + TypeSerializerSnapshot restoredPreviousSnapshot = serializerSpec.createSnapshot(); + restoredPreviousSnapshot.readSnapshot( + previousSnapshot.getCurrentVersion(), + new DataInputDeserializer(out.getCopyOfBuffer()), + SerializerSnapshotTest.class.getClassLoader()); + + return current.snapshotConfiguration().resolveSchemaCompatibility(restoredPreviousSnapshot); + } + + private static Stream serializerCascadingTypes() { + return serializerSpecs() + .flatMap( + serializerSpec -> + cascadingLogicalTypes() + .map(typeSpec -> typeSpec.toArguments(serializerSpec))); + } + + private static Stream> serializerSpecs() { + return Stream.of( + new SerializerSpec<>( + "ARRAY", + SerializerSnapshotTest::arraySerializer, + ArrayDataSerializer.ArrayDataSerializerSnapshot::new), + new SerializerSpec<>( + "ROW", + SerializerSnapshotTest::rowSerializer, + RowDataSerializer.RowDataSerializerSnapshot::new), + new SerializerSpec<>( + "MAP_KEY", + SerializerSnapshotTest::mapKeySerializer, + MapDataSerializer.BaseMapSerializerSnapshot::new), + new SerializerSpec<>( + "MAP_VALUE", + SerializerSnapshotTest::mapValueSerializer, + MapDataSerializer.BaseMapSerializerSnapshot::new)); + } + + private static RowDataSerializer rowSerializer(LogicalType... types) { + TypeSerializer[] fieldSerializers = new TypeSerializer[types.length]; + for (int i = 0; i < types.length; i++) { + fieldSerializers[i] = fieldSerializerFor(types[i]); + } + return new RowDataSerializer(types, fieldSerializers); + } + + private static ArrayDataSerializer arraySerializer(LogicalType elementType) { + return new ArrayDataSerializer(elementType, fieldSerializerFor(elementType)); + } + + private static MapDataSerializer mapKeySerializer(LogicalType keyType) { + LogicalType valueType = new IntType(false); + return new MapDataSerializer( + keyType, valueType, fieldSerializerFor(keyType), fieldSerializerFor(valueType)); + } + + private static MapDataSerializer mapValueSerializer(LogicalType valueType) { + LogicalType keyType = new VarCharType(false, 5); + return new MapDataSerializer( + keyType, valueType, fieldSerializerFor(keyType), fieldSerializerFor(valueType)); + } + + private static TypeSerializer fieldSerializerFor(LogicalType type) { + return InternalSerializers.create(type); + } + + private static Stream cascadingLogicalTypes() { + return Stream.of( + // Row with every nested nullability changed. + new CascadingLogicalTypeSpec(cascadingRowType(false), cascadingRowType(true)), + // Row with only leaf nullability changed. + new CascadingLogicalTypeSpec( + cascadingRowTypeWithLeafNullability(false), + cascadingRowTypeWithLeafNullability(true)), + // Array with cascading element type nullability changed. + new CascadingLogicalTypeSpec(cascadingArrayType(false), cascadingArrayType(true)), + // Map with cascading key and value type nullability changed. + new CascadingLogicalTypeSpec(cascadingMapType(false), cascadingMapType(true)), + // Multiset with cascading element type nullability changed. + new CascadingLogicalTypeSpec( + cascadingMultisetType(false), cascadingMultisetType(true)), + // Structured type with cascading attribute type nullability changed. + new CascadingLogicalTypeSpec( + cascadingStructuredType(false), cascadingStructuredType(true)), + // Distinct type with cascading source type nullability changed. + new CascadingLogicalTypeSpec( + cascadingDistinctType(false), cascadingDistinctType(true))); + } + + private static RowType nestedRowType(boolean isNullable) { + return RowType.of( + new LogicalType[] {new IntType(isNullable), new VarCharType(false, 100)}, + new String[] {"a", "b"}); + } + + private static RowType cascadingRowType(boolean isNullable) { + return new RowType( + isNullable, + Arrays.asList( + new RowField( + "arrayField", + new ArrayType( + isNullable, + new RowType( + isNullable, + Arrays.asList( + new RowField( + "element", + new IntType(isNullable)))))), + new RowField( + "mapField", + new MapType( + isNullable, + new VarCharType(isNullable, 5), + new RowType( + isNullable, + Arrays.asList( + new RowField( + "value", + new BigIntType(isNullable)))))), + new RowField( + "multisetField", + new MultisetType( + isNullable, + new RowType( + isNullable, + Arrays.asList( + new RowField( + "nested", + new BooleanType(isNullable)))))))); + } + + private static RowType cascadingRowTypeWithLeafNullability(boolean isNullable) { + return new RowType( + false, + Arrays.asList( + new RowField( + "arrayField", + new ArrayType( + false, + new RowType( + false, + Arrays.asList( + new RowField( + "element", + new IntType(isNullable)))))), + new RowField( + "mapField", + new MapType( + false, + new VarCharType(false, 5), + new RowType( + false, + Arrays.asList( + new RowField( + "value", + new BigIntType(isNullable)))))), + new RowField( + "multisetField", + new MultisetType( + false, + new RowType( + false, + Arrays.asList( + new RowField( + "nested", + new BooleanType(isNullable)))))))); + } + + private static ArrayType cascadingArrayType(boolean isNullable) { + return new ArrayType(isNullable, cascadingRowType(isNullable)); + } + + private static MapType cascadingMapType(boolean isNullable) { + return new MapType( + isNullable, new VarCharType(isNullable, 5), cascadingRowType(isNullable)); + } + + private static MultisetType cascadingMultisetType(boolean isNullable) { + return new MultisetType(isNullable, cascadingRowType(isNullable)); + } + + private static StructuredType cascadingStructuredType(boolean isNullable) { + return StructuredType.newBuilder( + ObjectIdentifier.of("cat", "db", "CascadingStructuredType")) + .attributes( + Arrays.asList( + new StructuredAttribute("field", cascadingArrayType(isNullable)))) + .setNullable(isNullable) + .build(); + } + + private static DistinctType cascadingDistinctType(boolean isNullable) { + return DistinctType.newBuilder( + ObjectIdentifier.of("cat", "db", "CascadingDistinctType"), + cascadingRowType(isNullable)) + .build(); + } + + private static final class SerializerSpec { + private final String name; + private final Function> serializerFactory; + private final Supplier> snapshotFactory; + + private SerializerSpec( + String name, + Function> serializerFactory, + Supplier> snapshotFactory) { + this.name = name; + this.serializerFactory = serializerFactory; + this.snapshotFactory = snapshotFactory; + } + + private TypeSerializer create(LogicalType type) { + return serializerFactory.apply(type); + } + + private TypeSerializerSnapshot createSnapshot() { + return snapshotFactory.get(); + } + + @Override + public String toString() { + return name; + } + } + + private static final class CascadingLogicalTypeSpec { + private final LogicalType previousType; + private final LogicalType currentType; + + private CascadingLogicalTypeSpec(LogicalType previousType, LogicalType currentType) { + this.previousType = previousType; + this.currentType = currentType; + } + + private Arguments toArguments(SerializerSpec serializerSpec) { + return Arguments.of(serializerSpec, previousType, currentType); + } + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeUtils.java index 98629b15fc177..d8678244cbfc7 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeUtils.java @@ -26,9 +26,13 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.DistinctType; import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.RowType.RowField; import org.apache.flink.table.types.logical.StructuredType; @@ -50,11 +54,48 @@ public final class LogicalTypeUtils { private static final String ATOMIC_FIELD_NAME = "f0"; private static final TimeAttributeRemover TIME_ATTRIBUTE_REMOVER = new TimeAttributeRemover(); + private static final NullabilityNormalizer NULLABILITY_NORMALIZER = new NullabilityNormalizer(); public static LogicalType removeTimeAttributes(LogicalType logicalType) { return logicalType.accept(TIME_ATTRIBUTE_REMOVER); } + public static LogicalType normalizeNullability(LogicalType logicalType) { + return logicalType.accept(NULLABILITY_NORMALIZER); + } + + /** + * Returns true when new types are structurally equal to old types ignoring nullability, and no + * type narrows from nullable to non-nullable. + */ + public static boolean areTypesCompatibleAfterNullabilityWidening( + LogicalType[] newTypes, LogicalType[] oldTypes) { + if (newTypes == oldTypes) { + return true; + } + if (newTypes == null || oldTypes == null || newTypes.length != oldTypes.length) { + return false; + } + for (int i = 0; i < newTypes.length; i++) { + if (!isTypeCompatibleAfterNullabilityWidening(newTypes[i], oldTypes[i])) { + return false; + } + } + return true; + } + + public static boolean isTypeCompatibleAfterNullabilityWidening( + LogicalType newType, LogicalType oldType) { + if (newType == oldType) { + return true; + } + if (newType == null || oldType == null) { + return false; + } + return normalizeNullability(newType).equals(normalizeNullability(oldType)) + && !hasNullabilityNarrowing(newType, oldType); + } + /** * Returns the conversion class for the given {@link LogicalType} that is used by the table * runtime as internal data structure. @@ -200,7 +241,79 @@ public LogicalType visit(LocalZonedTimestampType localZonedTimestampType) { } } - private LogicalTypeUtils() { - // no instantiation + private static boolean hasNullabilityNarrowing(LogicalType newType, LogicalType oldType) { + // reject narrowing: nullable -> non-nullable + if (oldType.isNullable() && !newType.isNullable()) { + return true; + } + + if (newType.is(LogicalTypeRoot.UNRESOLVED) || oldType.is(LogicalTypeRoot.UNRESOLVED)) { + return false; + } + + if (newType instanceof StructuredType && oldType instanceof StructuredType) { + StructuredType newStructuredType = (StructuredType) newType; + StructuredType oldStructuredType = (StructuredType) oldType; + if (newStructuredType.getSuperType().isPresent() + != oldStructuredType.getSuperType().isPresent()) { + return true; + } + if (newStructuredType.getSuperType().isPresent() + && hasNullabilityNarrowing( + newStructuredType.getSuperType().get(), + oldStructuredType.getSuperType().get())) { + return true; + } + } + + List newChildren = newType.getChildren(); + List oldChildren = oldType.getChildren(); + if (newChildren.size() != oldChildren.size()) { + return true; + } + for (int i = 0; i < newChildren.size(); i++) { + if (hasNullabilityNarrowing(newChildren.get(i), oldChildren.get(i))) { + return true; + } + } + return false; + } + + private static class NullabilityNormalizer extends LogicalTypeDuplicator { + + @Override + public LogicalType visit(ArrayType arrayType) { + return super.visit(arrayType).copy(true); + } + + @Override + public LogicalType visit(MultisetType multisetType) { + return super.visit(multisetType).copy(true); + } + + @Override + public LogicalType visit(MapType mapType) { + return super.visit(mapType).copy(true); + } + + @Override + public LogicalType visit(RowType rowType) { + return super.visit(rowType).copy(true); + } + + @Override + public LogicalType visit(DistinctType distinctType) { + return super.visit(distinctType).copy(true); + } + + @Override + public LogicalType visit(StructuredType structuredType) { + return super.visit(structuredType).copy(true); + } + + @Override + protected LogicalType defaultMethod(LogicalType logicalType) { + return logicalType.copy(true); + } } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/SerializerCheckUtils.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/SerializerCheckUtils.java new file mode 100644 index 0000000000000..dcb90739c51a5 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/SerializerCheckUtils.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.typeutils; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; + +/** Utils for serializer compatibility checks. */ +public class SerializerCheckUtils { + + public static boolean isSerializerCompatibleAfterNullabilityWidening( + TypeSerializer serializer, TypeSerializer previousSerializer) { + if (serializer.equals(previousSerializer)) { + return true; + } + + TypeSerializerSchemaCompatibility compatibility = + resolveSchemaCompatibility(serializer, previousSerializer); + return compatibility.isCompatibleAsIs(); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private static TypeSerializerSchemaCompatibility resolveSchemaCompatibility( + TypeSerializer serializer, TypeSerializer previousSerializer) { + TypeSerializerSnapshot previousSnapshot = previousSerializer.snapshotConfiguration(); + return serializer.snapshotConfiguration().resolveSchemaCompatibility(previousSnapshot); + } + + private SerializerCheckUtils() {} +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerSnapshotTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerSnapshotTest.java deleted file mode 100644 index 513801ac98a2d..0000000000000 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerSnapshotTest.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime.typeutils; - -import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; -import org.apache.flink.core.memory.DataInputDeserializer; -import org.apache.flink.core.memory.DataOutputSerializer; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.runtime.typeutils.RowDataSerializer.RowDataSerializerSnapshot; -import org.apache.flink.table.types.logical.BigIntType; -import org.apache.flink.table.types.logical.IntType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.types.logical.VarCharType; - -import org.junit.jupiter.api.Test; - -import static org.apache.flink.api.common.typeutils.TypeSerializerConditions.isCompatibleAsIs; -import static org.apache.flink.api.common.typeutils.TypeSerializerConditions.isIncompatible; -import static org.assertj.core.api.Assertions.assertThat; - -/** Tests for {@link RowDataSerializerSnapshot}. */ -class RowDataSerializerSnapshotTest { - - @Test - void sameTypesIsCompatibleAsIs() throws Exception { - RowDataSerializer previous = - new RowDataSerializer(new IntType(false), new VarCharType(false, 100)); - RowDataSerializer current = - new RowDataSerializer(new IntType(false), new VarCharType(false, 100)); - - assertThat(resolveCompatibility(previous, current)).is(isCompatibleAsIs()); - } - - /** Widening field nullability (NOT NULL -> NULL) should be compatible as-is. */ - @Test - void wideningFieldNullabilityIsCompatible() throws Exception { - RowDataSerializer previous = - new RowDataSerializer(new IntType(false), new VarCharType(false, 100)); - RowDataSerializer current = - new RowDataSerializer(new IntType(true), new VarCharType(true, 100)); - - assertThat(resolveCompatibility(previous, current)).is(isCompatibleAsIs()); - } - - /** Narrowing field nullability (NULL -> NOT NULL) should be incompatible. */ - @Test - void narrowingFieldNullabilityIsIncompatible() throws Exception { - RowDataSerializer previous = - new RowDataSerializer(new IntType(true), new VarCharType(true, 100)); - RowDataSerializer current = - new RowDataSerializer(new IntType(false), new VarCharType(false, 100)); - - assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); - } - - /** Mixed nullability changes should be incompatible when any field narrows. */ - @Test - void mixedNullabilityChangeIsIncompatibleWhenAnyFieldNarrows() throws Exception { - RowDataSerializer previous = - new RowDataSerializer(new IntType(false), new VarCharType(true, 100)); - // INT widened, VARCHAR narrowed - RowDataSerializer current = - new RowDataSerializer(new IntType(true), new VarCharType(false, 100)); - - assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); - } - - @Test - void differentFieldCountIsIncompatible() throws Exception { - RowDataSerializer previous = new RowDataSerializer(new IntType(false)); - RowDataSerializer current = - new RowDataSerializer(new IntType(false), new VarCharType(false, 100)); - - assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); - } - - @Test - void differentFieldTypeIsIncompatible() throws Exception { - RowDataSerializer previous = new RowDataSerializer(new IntType(false)); - RowDataSerializer current = new RowDataSerializer(new BigIntType(false)); - - assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); - } - - /** - * Nullability changes in nested row fields (even widening) are not supported and should be - * incompatible. - */ - @Test - void nullabilityChangeInNestedRowFieldIsIncompatible() throws Exception { - RowType nestedNotNull = - RowType.of( - new LogicalType[] {new IntType(false), new VarCharType(false, 100)}, - new String[] {"a", "b"}); - RowType nestedNullable = - RowType.of( - new LogicalType[] {new IntType(true), new VarCharType(false, 100)}, - new String[] {"a", "b"}); - - RowDataSerializer previous = new RowDataSerializer(new LogicalType[] {nestedNotNull}); - RowDataSerializer current = new RowDataSerializer(new LogicalType[] {nestedNullable}); - - assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); - } - - // ------------------------------------------------------------------------------------------- - // Helpers - // ------------------------------------------------------------------------------------------- - - /** - * Round-trips the previous serializer's snapshot through serialization (mirroring what happens - * on checkpoint restore) and resolves compatibility against the current serializer's snapshot. - */ - private static TypeSerializerSchemaCompatibility resolveCompatibility( - RowDataSerializer previous, RowDataSerializer current) throws Exception { - TypeSerializerSnapshot previousSnapshot = previous.snapshotConfiguration(); - - DataOutputSerializer out = new DataOutputSerializer(64); - previousSnapshot.writeSnapshot(out); - - TypeSerializerSnapshot restoredPreviousSnapshot = new RowDataSerializerSnapshot(); - restoredPreviousSnapshot.readSnapshot( - previousSnapshot.getCurrentVersion(), - new DataInputDeserializer(out.getCopyOfBuffer()), - RowDataSerializerSnapshotTest.class.getClassLoader()); - - return current.snapshotConfiguration().resolveSchemaCompatibility(restoredPreviousSnapshot); - } -} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/SerializerSnapshotTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/SerializerSnapshotTest.java new file mode 100644 index 0000000000000..3ddb20c989174 --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/SerializerSnapshotTest.java @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.typeutils; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.RowType.RowField; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute; +import org.apache.flink.table.types.logical.VarCharType; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Arrays; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import static org.apache.flink.api.common.typeutils.TypeSerializerConditions.isCompatibleAsIs; +import static org.apache.flink.api.common.typeutils.TypeSerializerConditions.isIncompatible; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for serializer snapshots. */ +class SerializerSnapshotTest { + + @ParameterizedTest(name = "{0}") + @MethodSource("serializerSpecs") + void sameTypesIsCompatibleAsIs(SerializerSpec serializerSpec) throws Exception { + assertThat(resolveCompatibility(serializerSpec, new IntType(false), new IntType(false))) + .is(isCompatibleAsIs()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("serializerSpecs") + void differentLogicalTypeIsIncompatible(SerializerSpec serializerSpec) throws Exception { + assertThat(resolveCompatibility(serializerSpec, new IntType(false), new BigIntType(false))) + .is(isIncompatible()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("serializerSpecs") + void wideningNullabilityInNestedRowIsCompatible(SerializerSpec serializerSpec) + throws Exception { + assertThat(resolveCompatibility(serializerSpec, nestedRowType(false), nestedRowType(true))) + .is(isCompatibleAsIs()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("serializerSpecs") + void narrowingNullabilityInNestedRowIsIncompatible(SerializerSpec serializerSpec) + throws Exception { + assertThat(resolveCompatibility(serializerSpec, nestedRowType(true), nestedRowType(false))) + .is(isIncompatible()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("serializerCascadingTypes") + void wideningNullabilityInCascadingTypeIsCompatible( + SerializerSpec serializerSpec, LogicalType previousType, LogicalType currentType) + throws Exception { + assertThat(resolveCompatibility(serializerSpec, previousType, currentType)) + .is(isCompatibleAsIs()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("serializerCascadingTypes") + void narrowingNullabilityInCascadingTypeIsIncompatible( + SerializerSpec serializerSpec, LogicalType previousType, LogicalType currentType) + throws Exception { + assertThat(resolveCompatibility(serializerSpec, currentType, previousType)) + .is(isIncompatible()); + } + + // ------------------------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------------------------- + + /** + * Round-trips the previous serializer's snapshot through serialization and resolves + * compatibility against the current serializer's snapshot. + */ + private static TypeSerializerSchemaCompatibility resolveCompatibility( + SerializerSpec serializerSpec, LogicalType previousType, LogicalType currentType) + throws Exception { + TypeSerializer previous = serializerSpec.create(previousType); + TypeSerializer current = serializerSpec.create(currentType); + TypeSerializerSnapshot previousSnapshot = previous.snapshotConfiguration(); + + DataOutputSerializer out = new DataOutputSerializer(64); + previousSnapshot.writeSnapshot(out); + + TypeSerializerSnapshot restoredPreviousSnapshot = serializerSpec.createSnapshot(); + restoredPreviousSnapshot.readSnapshot( + previousSnapshot.getCurrentVersion(), + new DataInputDeserializer(out.getCopyOfBuffer()), + SerializerSnapshotTest.class.getClassLoader()); + + return current.snapshotConfiguration().resolveSchemaCompatibility(restoredPreviousSnapshot); + } + + private static Stream serializerCascadingTypes() { + return serializerSpecs() + .flatMap( + serializerSpec -> + cascadingLogicalTypes() + .map(typeSpec -> typeSpec.toArguments(serializerSpec))); + } + + private static Stream> serializerSpecs() { + return Stream.of( + new SerializerSpec<>( + "ARRAY", + ArrayDataSerializer::new, + ArrayDataSerializer.ArrayDataSerializerSnapshot::new), + new SerializerSpec<>( + "ROW", + RowDataSerializer::new, + RowDataSerializer.RowDataSerializerSnapshot::new), + new SerializerSpec<>( + "MAP_KEY", + SerializerSnapshotTest::mapKeySerializer, + MapDataSerializer.MapDataSerializerSnapshot::new), + new SerializerSpec<>( + "MAP_VALUE", + SerializerSnapshotTest::mapValueSerializer, + MapDataSerializer.MapDataSerializerSnapshot::new)); + } + + private static MapDataSerializer mapKeySerializer(LogicalType keyType) { + return new MapDataSerializer(keyType, new IntType(false)); + } + + private static MapDataSerializer mapValueSerializer(LogicalType valueType) { + return new MapDataSerializer(new VarCharType(false, 5), valueType); + } + + private static Stream cascadingLogicalTypes() { + return Stream.of( + // Row with every nested nullability changed. + new CascadingLogicalTypeSpec(cascadingRowType(false), cascadingRowType(true)), + // Row with only leaf nullability changed. + new CascadingLogicalTypeSpec( + cascadingRowTypeWithLeafNullability(false), + cascadingRowTypeWithLeafNullability(true)), + // Array with cascading element type nullability changed. + new CascadingLogicalTypeSpec(cascadingArrayType(false), cascadingArrayType(true)), + // Map with cascading key and value type nullability changed. + new CascadingLogicalTypeSpec(cascadingMapType(false), cascadingMapType(true)), + // Multiset with cascading element type nullability changed. + new CascadingLogicalTypeSpec( + cascadingMultisetType(false), cascadingMultisetType(true)), + // Structured type with cascading attribute type nullability changed. + new CascadingLogicalTypeSpec( + cascadingStructuredType(false), cascadingStructuredType(true)), + // Distinct type with cascading source type nullability changed. + new CascadingLogicalTypeSpec( + cascadingDistinctType(false), cascadingDistinctType(true))); + } + + private static RowType nestedRowType(boolean isNullable) { + return RowType.of( + new LogicalType[] {new IntType(isNullable), new VarCharType(false, 100)}, + new String[] {"a", "b"}); + } + + private static RowType cascadingRowType(boolean isNullable) { + return new RowType( + isNullable, + Arrays.asList( + new RowField( + "arrayField", + new ArrayType( + isNullable, + new RowType( + isNullable, + Arrays.asList( + new RowField( + "element", + new IntType(isNullable)))))), + new RowField( + "mapField", + new MapType( + isNullable, + new VarCharType(isNullable, 5), + new RowType( + isNullable, + Arrays.asList( + new RowField( + "value", + new BigIntType(isNullable)))))), + new RowField( + "multisetField", + new MultisetType( + isNullable, + new RowType( + isNullable, + Arrays.asList( + new RowField( + "nested", + new BooleanType(isNullable)))))))); + } + + private static RowType cascadingRowTypeWithLeafNullability(boolean isNullable) { + return new RowType( + false, + Arrays.asList( + new RowField( + "arrayField", + new ArrayType( + false, + new RowType( + false, + Arrays.asList( + new RowField( + "element", + new IntType(isNullable)))))), + new RowField( + "mapField", + new MapType( + false, + new VarCharType(false, 5), + new RowType( + false, + Arrays.asList( + new RowField( + "value", + new BigIntType(isNullable)))))), + new RowField( + "multisetField", + new MultisetType( + false, + new RowType( + false, + Arrays.asList( + new RowField( + "nested", + new BooleanType(isNullable)))))))); + } + + private static ArrayType cascadingArrayType(boolean isNullable) { + return new ArrayType(isNullable, cascadingRowType(isNullable)); + } + + private static MapType cascadingMapType(boolean isNullable) { + return new MapType( + isNullable, new VarCharType(isNullable, 5), cascadingRowType(isNullable)); + } + + private static MultisetType cascadingMultisetType(boolean isNullable) { + return new MultisetType(isNullable, cascadingRowType(isNullable)); + } + + private static StructuredType cascadingStructuredType(boolean isNullable) { + return StructuredType.newBuilder( + ObjectIdentifier.of("cat", "db", "CascadingStructuredType")) + .attributes( + Arrays.asList( + new StructuredAttribute("field", cascadingArrayType(isNullable)))) + .setNullable(isNullable) + .build(); + } + + private static DistinctType cascadingDistinctType(boolean isNullable) { + return DistinctType.newBuilder( + ObjectIdentifier.of("cat", "db", "CascadingDistinctType"), + cascadingRowType(isNullable)) + .build(); + } + + private static final class SerializerSpec { + private final String name; + private final Function> serializerFactory; + private final Supplier> snapshotFactory; + + private SerializerSpec( + String name, + Function> serializerFactory, + Supplier> snapshotFactory) { + this.name = name; + this.serializerFactory = serializerFactory; + this.snapshotFactory = snapshotFactory; + } + + private TypeSerializer create(LogicalType type) { + return serializerFactory.apply(type); + } + + private TypeSerializerSnapshot createSnapshot() { + return snapshotFactory.get(); + } + + @Override + public String toString() { + return name; + } + } + + private static final class CascadingLogicalTypeSpec { + private final LogicalType previousType; + private final LogicalType currentType; + + private CascadingLogicalTypeSpec(LogicalType previousType, LogicalType currentType) { + this.previousType = previousType; + this.currentType = currentType; + } + + private Arguments toArguments(SerializerSpec serializerSpec) { + return Arguments.of(serializerSpec, previousType, currentType); + } + } +} diff --git a/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java index bf9783789fe28..7af53425ffd40 100644 --- a/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java +++ b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java @@ -43,6 +43,9 @@ import java.lang.reflect.Array; import java.util.Arrays; +import static org.apache.flink.table.runtime.typeutils.SerializerCheckUtils.isSerializerCompatibleAfterNullabilityWidening; +import static org.apache.flink.table.types.logical.utils.LogicalTypeUtils.isTypeCompatibleAfterNullabilityWidening; + /** Serializer for {@link ArrayData}. */ @Internal public class ArrayDataSerializer extends TypeSerializer { @@ -333,12 +336,14 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( ArrayDataSerializerSnapshot oldArrayDataSerializerSnapshot = (ArrayDataSerializerSnapshot) oldSerializerSnapshot; - if (!eleType.equals(oldArrayDataSerializerSnapshot.eleType) - || !eleSer.equals(oldArrayDataSerializerSnapshot.eleSer)) { + + if (!isTypeCompatibleAfterNullabilityWidening( + eleType, oldArrayDataSerializerSnapshot.eleType) + || !isSerializerCompatibleAfterNullabilityWidening( + eleSer, oldArrayDataSerializerSnapshot.eleSer)) { return TypeSerializerSchemaCompatibility.incompatible(); - } else { - return TypeSerializerSchemaCompatibility.compatibleAsIs(); } + return TypeSerializerSchemaCompatibility.compatibleAsIs(); } } } diff --git a/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java index cd58a4bc0a3f2..f37c58fffad08 100644 --- a/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java +++ b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java @@ -40,6 +40,9 @@ import java.io.IOException; +import static org.apache.flink.table.runtime.typeutils.SerializerCheckUtils.isSerializerCompatibleAfterNullabilityWidening; +import static org.apache.flink.table.types.logical.utils.LogicalTypeUtils.isTypeCompatibleAfterNullabilityWidening; + /** Serializer for {@link MapData}. */ @Internal public class MapDataSerializer extends TypeSerializer { @@ -323,14 +326,18 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( MapDataSerializerSnapshot previousMapDataSerializerSnapshot = (MapDataSerializerSnapshot) oldSerializerSnapshot; - if (!keyType.equals(previousMapDataSerializerSnapshot.keyType) - || !valueType.equals(previousMapDataSerializerSnapshot.valueType) - || !keySerializer.equals(previousMapDataSerializerSnapshot.keySerializer) - || !valueSerializer.equals(previousMapDataSerializerSnapshot.valueSerializer)) { + + if (!isTypeCompatibleAfterNullabilityWidening( + keyType, previousMapDataSerializerSnapshot.keyType) + || !isTypeCompatibleAfterNullabilityWidening( + valueType, previousMapDataSerializerSnapshot.valueType) + || !isSerializerCompatibleAfterNullabilityWidening( + keySerializer, previousMapDataSerializerSnapshot.keySerializer) + || !isSerializerCompatibleAfterNullabilityWidening( + valueSerializer, previousMapDataSerializerSnapshot.valueSerializer)) { return TypeSerializerSchemaCompatibility.incompatible(); - } else { - return TypeSerializerSchemaCompatibility.compatibleAsIs(); } + return TypeSerializerSchemaCompatibility.compatibleAsIs(); } } } diff --git a/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java index 8d419f15212f4..585406f310ff4 100644 --- a/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java +++ b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java @@ -44,6 +44,8 @@ import java.util.Arrays; import java.util.stream.IntStream; +import static org.apache.flink.table.types.logical.utils.LogicalTypeUtils.areTypesCompatibleAfterNullabilityWidening; + /** Serializer for {@link RowData}. */ @Internal public class RowDataSerializer extends AbstractRowDataSerializer { @@ -345,7 +347,7 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( RowDataSerializerSnapshot oldRowDataSerializerSnapshot = (RowDataSerializerSnapshot) oldSerializerSnapshot; // Allow NOT NULL -> NULL widening; reject NULL -> NOT NULL narrowing. - if (!typesAreCompatibleAfterNullabilityWidening( + if (!areTypesCompatibleAfterNullabilityWidening( types, oldRowDataSerializerSnapshot.types)) { return TypeSerializerSchemaCompatibility.incompatible(); } @@ -366,32 +368,5 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( return intermediateResult.getFinalResult(); } - - /** - * Returns true when new field types are structurally equal to old ones ignoring top-level - * nullability, and no field narrows from nullable to non-nullable. - */ - private static boolean typesAreCompatibleAfterNullabilityWidening( - LogicalType[] newTypes, LogicalType[] oldTypes) { - if (newTypes == oldTypes) { - return true; - } - if (newTypes == null || oldTypes == null || newTypes.length != oldTypes.length) { - return false; - } - for (int i = 0; i < newTypes.length; i++) { - LogicalType newType = newTypes[i]; - LogicalType oldType = oldTypes[i]; - // structurally equal except (possibly) for top-level nullability - if (!newType.copy(true).equals(oldType.copy(true))) { - return false; - } - // reject narrowing: nullable -> non-nullable - if (oldType.isNullable() && !newType.isNullable()) { - return false; - } - } - return true; - } } }