metadataKeys) {
+ this.metadataKeys = metadataKeys;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+}
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroVariantDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroVariantDeserializationSchema.java
new file mode 100644
index 0000000000000..2be3237d86760
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroVariantDeserializationSchema.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.AvroToVariantDataConverters.AvroToVariantDataConverter;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.types.variant.Variant;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Deserialization schema from Avro bytes to {@link RowData} with a Variant physical column and
+ * optional metadata columns appended at the end. Wraps a {@link
+ * RegistryWriterAvroDeserializationSchema} that produces {@link GenericRecord}, then converts to
+ * Variant and assembles the output row.
+ *
+ * Converter cache performance will be better when combined with CachedSchemaProvider's that
+ * return same Schema object reference in case of matching schemas.
+ */
+@Internal
+public class AvroVariantDeserializationSchema implements DeserializationSchema {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AvroVariantDeserializationSchema.class);
+
+ private final RegistryWriterAvroDeserializationSchema innerDeserializationSchema;
+ private final boolean includeSchemaMetadata;
+ private final int maxCacheSize;
+ private final TypeInformation typeInfo;
+
+ transient Map converterCache;
+
+ public AvroVariantDeserializationSchema(
+ RegistryWriterAvroDeserializationSchema innerDeserializationSchema,
+ boolean includeSchemaMetadata,
+ int maxCacheSize,
+ TypeInformation typeInfo) {
+ this.innerDeserializationSchema = Objects.requireNonNull(innerDeserializationSchema);
+ this.includeSchemaMetadata = includeSchemaMetadata;
+ this.maxCacheSize = maxCacheSize;
+ this.typeInfo = Objects.requireNonNull(typeInfo);
+ }
+
+ @Override
+ public void open(InitializationContext context) throws Exception {
+ this.innerDeserializationSchema.open(context);
+ this.converterCache =
+ new LinkedHashMap<>(maxCacheSize) {
+ @Override
+ protected boolean removeEldestEntry(
+ Map.Entry eldest) {
+ if (size() > maxCacheSize) {
+ LOG.info(
+ "Evicting schema converter from cache, size exceeded {}",
+ maxCacheSize);
+ return true;
+ }
+ return false;
+ }
+ };
+ }
+
+ @Override
+ public RowData deserialize(@Nullable byte[] message) throws IOException {
+ if (message == null) {
+ return null;
+ }
+
+ GenericRecord record = innerDeserializationSchema.deserialize(message);
+ if (record == null) {
+ return null;
+ }
+
+ GenericRowData row = new GenericRowData(includeSchemaMetadata ? 2 : 1);
+ Schema writerSchema = record.getSchema();
+ AvroToVariantDataConverter converter = getOrCreateConverter(writerSchema);
+ Variant variant = converter.convert(record);
+ row.setField(0, variant);
+
+ if (includeSchemaMetadata) {
+ row.setField(1, StringData.fromString(writerSchema.toString()));
+ }
+
+ return row;
+ }
+
+ private AvroToVariantDataConverter getOrCreateConverter(Schema writerSchema) {
+ return converterCache.computeIfAbsent(
+ writerSchema, AvroToVariantDataConverters::createVariantConverter);
+ }
+
+ @Override
+ public boolean isEndOfStream(RowData nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return typeInfo;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ AvroVariantDeserializationSchema that = (AvroVariantDeserializationSchema) o;
+ return includeSchemaMetadata == that.includeSchemaMetadata
+ && maxCacheSize == that.maxCacheSize
+ && innerDeserializationSchema.equals(that.innerDeserializationSchema)
+ && typeInfo.equals(that.typeInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ includeSchemaMetadata, maxCacheSize, innerDeserializationSchema, typeInfo);
+ }
+}
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryWriterAvroDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryWriterAvroDeserializationSchema.java
new file mode 100644
index 0000000000000..21e5b16475c85
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryWriterAvroDeserializationSchema.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using a {@link SchemaCoder} to
+ * resolve the writer schema dynamically per record. Unlike {@link
+ * RegistryAvroDeserializationSchema} which projects records into a fixed reader schema, this class
+ * uses the writer schema as both reader and writer — preserving all fields as-is.
+ *
+ * Designed for use cases where the Avro schema is not known at table creation time and varies
+ * per record.
+ */
+public class RegistryWriterAvroDeserializationSchema
+ implements DeserializationSchema {
+
+ private static final long serialVersionUID = 1L;
+
+ private final SchemaCoder.SchemaCoderProvider schemaCoderProvider;
+
+ private transient SchemaCoder schemaCoder;
+ private transient MutableByteArrayInputStream inputStream;
+ private transient BinaryDecoder decoder;
+ private transient GenericDatumReader datumReader;
+
+ public RegistryWriterAvroDeserializationSchema(
+ SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+ this.schemaCoderProvider = Objects.requireNonNull(schemaCoderProvider);
+ }
+
+ @Override
+ public void open(InitializationContext context) throws Exception {
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ GenericData genericData = new GenericData(cl);
+ this.datumReader = new GenericDatumReader<>(null, null, genericData);
+ this.inputStream = new MutableByteArrayInputStream();
+ this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+ this.schemaCoder = schemaCoderProvider.get();
+ }
+
+ @Override
+ public GenericRecord deserialize(@Nullable byte[] message) throws IOException {
+ if (message == null) {
+ return null;
+ }
+
+ inputStream.setBuffer(message);
+ Schema writerSchema = schemaCoder.readSchema(inputStream);
+ datumReader.setSchema(writerSchema);
+ datumReader.setExpected(writerSchema);
+
+ return datumReader.read(null, decoder);
+ }
+
+ @Override
+ public boolean isEndOfStream(GenericRecord nextElement) {
+ return false;
+ }
+
+ // This is unused as this is composed in AvroVariantDeserializationSchema
+ // which has its own type info.
+ @Override
+ public TypeInformation getProducedType() {
+ return TypeInformation.of(GenericRecord.class);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RegistryWriterAvroDeserializationSchema that = (RegistryWriterAvroDeserializationSchema) o;
+ return schemaCoderProvider.equals(that.schemaCoderProvider);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(schemaCoderProvider);
+ }
+}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroVariantDeserializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroVariantDeserializationSchemaTest.java
new file mode 100644
index 0000000000000..2ed3b5fbe4e8e
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroVariantDeserializationSchemaTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
+import org.apache.flink.formats.avro.generated.Address;
+import org.apache.flink.formats.avro.utils.TestDataGenerator;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.VariantType;
+import org.apache.flink.types.variant.Variant;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Random;
+
+import static org.apache.flink.formats.avro.utils.AvroTestUtils.fixedRoundRobinSchemaCoderProvider;
+import static org.apache.flink.formats.avro.utils.AvroTestUtils.writeRecord;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class AvroVariantDeserializationSchemaTest {
+
+ private static final Address ADDRESS = TestDataGenerator.generateRandomAddress(new Random());
+ private static final Schema ADDRESS_SCHEMA = Address.getClassSchema();
+ private static final Schema ADDRESS_SCHEMA_V2 =
+ new Schema.Parser()
+ .parse(
+ "{\"namespace\": \"org.apache.flink.formats.avro.generated\","
+ + " \"type\": \"record\","
+ + " \"name\": \"Address\","
+ + " \"fields\": ["
+ + " {\"name\": \"num\", \"type\": \"int\"},"
+ + " {\"name\": \"street\", \"type\": \"string\"},"
+ + " {\"name\": \"city\", \"type\": \"string\"},"
+ + " {\"name\": \"state\", \"type\": \"string\"},"
+ + " {\"name\": \"zip\", \"type\": \"string\"},"
+ + " {\"name\": \"country\", \"type\": \"string\"}"
+ + " ]}");
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testDeserialization(boolean includeSchemaMetadata) throws Exception {
+ RowType rowType =
+ includeSchemaMetadata
+ ? RowType.of(
+ new VariantType(), new VarCharType(true, VarCharType.MAX_LENGTH))
+ : RowType.of(new VariantType());
+ AvroVariantDeserializationSchema deserializer =
+ new AvroVariantDeserializationSchema(
+ new RegistryWriterAvroDeserializationSchema(
+ fixedRoundRobinSchemaCoderProvider(ADDRESS_SCHEMA)),
+ includeSchemaMetadata,
+ 10,
+ InternalTypeInfo.of(rowType));
+ deserializer.open(new DummyInitializationContext());
+
+ RowData rowData = deserializer.deserialize(writeRecord(ADDRESS, ADDRESS_SCHEMA));
+ assertThat(rowData.getArity()).isEqualTo(includeSchemaMetadata ? 2 : 1);
+
+ Variant variant = rowData.getVariant(0);
+ assertThat(variant.getField("num").getInt()).isEqualTo(ADDRESS.getNum());
+ assertThat(variant.getField("street").getString()).isEqualTo(ADDRESS.getStreet());
+ assertThat(variant.getField("city").getString()).isEqualTo(ADDRESS.getCity());
+ assertThat(variant.getField("state").getString()).isEqualTo(ADDRESS.getState());
+ assertThat(variant.getField("zip").getString()).isEqualTo(ADDRESS.getZip());
+
+ if (includeSchemaMetadata) {
+ assertThat(rowData.getString(1).toString()).isEqualTo(ADDRESS_SCHEMA.toString());
+ }
+ }
+
+ @Test
+ void testConverterCacheEviction() throws Exception {
+ GenericRecord v2Record = new GenericData.Record(ADDRESS_SCHEMA_V2);
+ v2Record.put("num", ADDRESS.getNum());
+ v2Record.put("street", ADDRESS.getStreet());
+ v2Record.put("city", ADDRESS.getCity());
+ v2Record.put("state", ADDRESS.getState());
+ v2Record.put("zip", ADDRESS.getZip());
+ v2Record.put("country", "US");
+
+ AvroVariantDeserializationSchema deserializer =
+ new AvroVariantDeserializationSchema(
+ new RegistryWriterAvroDeserializationSchema(
+ fixedRoundRobinSchemaCoderProvider(
+ ADDRESS_SCHEMA, ADDRESS_SCHEMA_V2)),
+ false,
+ 1,
+ InternalTypeInfo.of(RowType.of(new VariantType())));
+ deserializer.open(new DummyInitializationContext());
+
+ deserializer.deserialize(writeRecord(ADDRESS, ADDRESS_SCHEMA));
+ assertThat(deserializer.converterCache).hasSize(1);
+ assertThat(deserializer.converterCache).containsKey(ADDRESS_SCHEMA);
+
+ // ADDRESS_SCHEMA_V2 triggers eviction of ADDRESS_SCHEMA (maxCacheSize=1)
+ deserializer.deserialize(writeRecord(v2Record, ADDRESS_SCHEMA_V2));
+ assertThat(deserializer.converterCache).hasSize(1);
+ assertThat(deserializer.converterCache).doesNotContainKey(ADDRESS_SCHEMA);
+ assertThat(deserializer.converterCache).containsKey(ADDRESS_SCHEMA_V2);
+
+ // ADDRESS_SCHEMA re-enters the cache, deserialization still works
+ deserializer.deserialize(writeRecord(ADDRESS, ADDRESS_SCHEMA));
+ assertThat(deserializer.converterCache).hasSize(1);
+ assertThat(deserializer.converterCache).containsKey(ADDRESS_SCHEMA);
+ }
+
+ @Test
+ void testDeserializeNull() throws Exception {
+ AvroVariantDeserializationSchema deserializer =
+ new AvroVariantDeserializationSchema(
+ new RegistryWriterAvroDeserializationSchema(
+ fixedRoundRobinSchemaCoderProvider(ADDRESS_SCHEMA)),
+ false,
+ 10,
+ InternalTypeInfo.of(RowType.of(new VariantType())));
+ deserializer.open(new DummyInitializationContext());
+
+ assertThat(deserializer.deserialize(null)).isNull();
+ }
+}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryWriterAvroDeserializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryWriterAvroDeserializationSchemaTest.java
new file mode 100644
index 0000000000000..c00883353bebf
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryWriterAvroDeserializationSchemaTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
+import org.apache.flink.formats.avro.generated.Address;
+import org.apache.flink.formats.avro.utils.TestDataGenerator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.jupiter.api.Test;
+
+import java.util.Random;
+
+import static org.apache.flink.formats.avro.utils.AvroTestUtils.fixedRoundRobinSchemaCoderProvider;
+import static org.apache.flink.formats.avro.utils.AvroTestUtils.writeRecord;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class RegistryWriterAvroDeserializationSchemaTest {
+
+ private static final Address address = TestDataGenerator.generateRandomAddress(new Random());
+ private static final Schema ADDRESS_SCHEMA = Address.getClassSchema();
+
+ @Test
+ void testGenericRecordReadWithFullSchema() throws Exception {
+ RegistryWriterAvroDeserializationSchema deserializer =
+ new RegistryWriterAvroDeserializationSchema(
+ fixedRoundRobinSchemaCoderProvider(ADDRESS_SCHEMA));
+
+ deserializer.open(new DummyInitializationContext());
+
+ GenericRecord record = deserializer.deserialize(writeRecord(address, ADDRESS_SCHEMA));
+ assertThat(record.getSchema()).isSameAs(ADDRESS_SCHEMA);
+ assertThat(record.get("num")).isEqualTo(address.getNum());
+ assertThat(record.get("street").toString()).isEqualTo(address.getStreet());
+ assertThat(record.get("city").toString()).isEqualTo(address.getCity());
+ assertThat(record.get("state").toString()).isEqualTo(address.getState());
+ assertThat(record.get("zip").toString()).isEqualTo(address.getZip());
+ }
+}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
index b4e12f8ca10ee..ec76f65d993b6 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
@@ -21,6 +21,7 @@
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.formats.avro.AvroFormatOptions.AvroEncoding;
+import org.apache.flink.formats.avro.SchemaCoder;
import org.apache.flink.formats.avro.generated.Address;
import org.apache.flink.formats.avro.generated.Colors;
import org.apache.flink.formats.avro.generated.Fixed16;
@@ -42,6 +43,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
@@ -55,6 +57,8 @@
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
/** Utilities for creating Avro Schemas. */
public final class AvroTestUtils {
@@ -397,4 +401,25 @@ public static Encoder createEncoder(
return EncoderFactory.get().binaryEncoder(outputStream, null);
}
}
+
+ public static SchemaCoder.SchemaCoderProvider fixedRoundRobinSchemaCoderProvider(
+ Schema... schemas) {
+ return fixedRoundRobinSchemaCoderProvider(Arrays.asList(schemas));
+ }
+
+ public static SchemaCoder.SchemaCoderProvider fixedRoundRobinSchemaCoderProvider(
+ List schemas) {
+ return () -> {
+ AtomicInteger index = new AtomicInteger(0);
+ return new SchemaCoder() {
+ @Override
+ public Schema readSchema(InputStream in) {
+ return schemas.get(index.getAndIncrement() % schemas.size());
+ }
+
+ @Override
+ public void writeSchema(Schema schema, OutputStream out) throws IOException {}
+ };
+ };
+ }
}