diff --git a/docs/content/docs/connectors/table/formats/avro-variant-confluent.md b/docs/content/docs/connectors/table/formats/avro-variant-confluent.md new file mode 100644 index 0000000000000..52daa945032a5 --- /dev/null +++ b/docs/content/docs/connectors/table/formats/avro-variant-confluent.md @@ -0,0 +1,271 @@ +--- +title: Confluent Avro Variant +weight: 5 +type: docs +--- + + +# Confluent Avro Variant Format + +{{< label "Format: Deserialization Schema" >}} + +The Avro Variant Schema Registry (``avro-variant-confluent``) format allows you to read records that were serialized by the ``io.confluent.kafka.serializers.KafkaAvroSerializer`` and deserialize them into Flink's ``VARIANT`` type. + +Unlike the [Confluent Avro]({{< ref "docs/connectors/table/formats/avro-confluent" >}}) format which requires the table schema to match the Avro schema, this format reads the entire Avro record into a single ``VARIANT`` column. The writer schema is resolved dynamically per record from the configured Confluent Schema Registry based on the schema version id encoded in the record. The Avro schema does not need to be known at table creation time and can evolve across records. + +This format is deserialization-only and does not support writing. + +Dependencies +------------ + +{{< sql_download_table "avro-confluent" >}} + +For Maven, SBT, Gradle, or other build automation tools, please also ensure that Confluent's maven repository at `https://packages.confluent.io/maven/` is configured in your project's build files. + +How to read with Avro-Variant-Confluent format +-------------- + +This format is particularly useful when a single Kafka topic carries multiple event types with different Avro schemas. With the standard ``avro-confluent`` format, you would need a separate table for each event type, each with a fixed schema. With ``avro-variant-confluent``, a single table can ingest all event types and you can route or filter them at query time. Also, can be useful when reading from multiple Kafka topics with topic pattern. + +Example of a table that reads Avro records from Kafka into a VARIANT column: + +```sql +CREATE TABLE kafka_source ( + data VARIANT +) WITH ( + 'connector' = 'kafka', + 'topic' = 'user_events', + 'properties.bootstrap.servers' = 'localhost:9092', + 'properties.group.id' = 'my-group', + 'format' = 'avro-variant-confluent', + 'avro-variant-confluent.url' = 'http://localhost:8081' +); + +-- Access variant fields using bracket notation +SELECT + data['id'] AS id, + data['name'] AS name, + data['email'] AS email +FROM kafka_source; +``` + +--- + +Example of reading from multiple topics with different Avro schemas using a topic pattern: + +```sql +-- Read from multiple topics with different Avro schemas using a single table +CREATE TABLE all_orders ( + data VARIANT +) WITH ( + 'connector' = 'kafka', + 'topic-pattern' = 'orders-*', + .. + 'format' = 'avro-variant-confluent', + 'avro-variant-confluent.url' = 'http://localhost:8081' +); + +-- Query across all order topics regardless of schema differences +SELECT + data['order_id'] AS order_id, + data['customer_id'] AS customer_id, + data['total'] AS total, + data['region'] AS region +FROM all_orders; +``` + +--- + +The format also supports an optional ``schema`` metadata column that exposes the Avro writer schema string for each record: + +```sql +CREATE TABLE kafka_source ( + data VARIANT, + avro_schema STRING METADATA FROM 'schema' +) WITH ( + 'connector' = 'kafka', + 'topic' = 'user_events', + 'properties.bootstrap.servers' = 'localhost:9092', + 'properties.group.id' = 'my-group', + 'format' = 'avro-variant-confluent', + 'avro-variant-confluent.url' = 'http://localhost:8081' +); + +-- Query fields and inspect the writer schema +SELECT + data['id'] AS id, + data['name'] AS name, + avro_schema +FROM kafka_source; +``` + +Format Options +---------------- + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
OptionRequiredForwardedDefaultTypeDescription
format
requiredno(none)StringSpecify what format to use, here should be 'avro-variant-confluent'.
avro-variant-confluent.url
requiredyes(none)StringThe URL of the Confluent Schema Registry to fetch schemas.
avro-variant-confluent.properties
optionalyes(none)MapProperties map that is forwarded to the underlying Schema Registry. This is useful for options that are not officially exposed via Flink config options. However, note that Flink options have higher precedence.
avro-variant-confluent.ssl.keystore.location
optionalyes(none)StringLocation / File of SSL keystore
avro-variant-confluent.ssl.keystore.password
optionalyes(none)StringPassword for SSL keystore
avro-variant-confluent.ssl.truststore.location
optionalyes(none)StringLocation / File of SSL truststore
avro-variant-confluent.ssl.truststore.password
optionalyes(none)StringPassword for SSL truststore
avro-variant-confluent.basic-auth.credentials-source
optionalyes(none)StringBasic auth credentials source for Schema Registry
avro-variant-confluent.basic-auth.user-info
optionalyes(none)StringBasic auth user info for Schema Registry
avro-variant-confluent.bearer-auth.credentials-source
optionalyes(none)StringBearer auth credentials source for Schema Registry
avro-variant-confluent.bearer-auth.token
optionalyes(none)StringBearer auth token for Schema Registry
+ +Metadata +-------- + +The format supports the following metadata column: + + + + + + + + + + + + + + + + +
KeyData TypeDescription
schemaSTRING NOT NULLThe Avro writer schema string (JSON format) used to serialize the record.
+ +Data Type Mapping +---------------- + +All Avro types are converted to Flink's VARIANT type. The following Avro logical types receive special handling: + +| Avro Type | Avro Logical Type | Variant Representation | +|-----------|-------------------|----------------------| +| INT | date | DATE | +| INT | time-millis | LONG (microseconds) | +| LONG | timestamp-millis | TIMESTAMP | +| LONG | timestamp-micros | TIMESTAMP | +| BYTES/FIXED | decimal | DECIMAL | +| RECORD | - | OBJECT | +| ARRAY | - | ARRAY | +| MAP | - | OBJECT (string keys) | +| UNION (null + type) | - | nullable inner type | +| STRING, ENUM | - | STRING | +| BOOLEAN | - | BOOLEAN | +| INT | - | INT | +| LONG | - | LONG | +| FLOAT | - | FLOAT | +| DOUBLE | - | DOUBLE | +| BYTES, FIXED | - | BYTES | +| NULL | - | NULL | + diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroVariantFormatFactory.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroVariantFormatFactory.java new file mode 100644 index 0000000000000..8d93c31998b90 --- /dev/null +++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroVariantFormatFactory.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.confluent; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.formats.avro.AvroVariantDecodingFormat; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.FactoryUtil; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BASIC_AUTH_CREDENTIALS_SOURCE; +import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BASIC_AUTH_USER_INFO; +import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BEARER_AUTH_CREDENTIALS_SOURCE; +import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BEARER_AUTH_TOKEN; +import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.PROPERTIES; +import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SSL_KEYSTORE_LOCATION; +import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SSL_KEYSTORE_PASSWORD; +import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SSL_TRUSTSTORE_LOCATION; +import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SSL_TRUSTSTORE_PASSWORD; +import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.URL; + +/** + * Table format factory for providing configured instances of Confluent Schema Registry Avro to + * Variant {@link DeserializationSchema}. Deserialization only — no serialization support. + */ +@Internal +public class RegistryAvroVariantFormatFactory implements DeserializationFormatFactory { + + public static final String IDENTIFIER = "avro-variant-confluent"; + + private static final int SCHEMA_CACHE_CAPACITY = 1000; + + @Override + public DecodingFormat> createDecodingFormat( + DynamicTableFactory.Context context, ReadableConfig formatOptions) { + FactoryUtil.validateFactoryOptions(this, formatOptions); + + String schemaRegistryURL = formatOptions.get(URL); + Map optionalPropertiesMap = + RegistryAvroFormatFactory.buildOptionalPropertiesMap(formatOptions); + + return new AvroVariantDecodingFormat( + new CachedSchemaCoderProvider( + null, schemaRegistryURL, SCHEMA_CACHE_CAPACITY, optionalPropertiesMap), + SCHEMA_CACHE_CAPACITY); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + Set> options = new HashSet<>(); + options.add(URL); + return options; + } + + @Override + public Set> optionalOptions() { + Set> options = new HashSet<>(); + options.add(PROPERTIES); + options.add(SSL_KEYSTORE_LOCATION); + options.add(SSL_KEYSTORE_PASSWORD); + options.add(SSL_TRUSTSTORE_LOCATION); + options.add(SSL_TRUSTSTORE_PASSWORD); + options.add(BASIC_AUTH_CREDENTIALS_SOURCE); + options.add(BASIC_AUTH_USER_INFO); + options.add(BEARER_AUTH_CREDENTIALS_SOURCE); + options.add(BEARER_AUTH_TOKEN); + return options; + } + + @Override + public Set> forwardOptions() { + Set> options = new HashSet<>(); + options.add(URL); + options.add(PROPERTIES); + options.add(SSL_KEYSTORE_LOCATION); + options.add(SSL_KEYSTORE_PASSWORD); + options.add(SSL_TRUSTSTORE_LOCATION); + options.add(SSL_TRUSTSTORE_PASSWORD); + options.add(BASIC_AUTH_CREDENTIALS_SOURCE); + options.add(BASIC_AUTH_USER_INFO); + options.add(BEARER_AUTH_CREDENTIALS_SOURCE); + options.add(BEARER_AUTH_TOKEN); + return options; + } +} diff --git a/flink-formats/flink-avro-confluent-registry/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-formats/flink-avro-confluent-registry/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index e1b78a27cbba7..d599995e7d76a 100644 --- a/flink-formats/flink-avro-confluent-registry/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-formats/flink-avro-confluent-registry/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -15,3 +15,4 @@ org.apache.flink.formats.avro.registry.confluent.RegistryAvroFormatFactory org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroFormatFactory +org.apache.flink.formats.avro.registry.confluent.RegistryAvroVariantFormatFactory diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroVariantDeserializationSchemaTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroVariantDeserializationSchemaTest.java new file mode 100644 index 0000000000000..61e520bffd22e --- /dev/null +++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroVariantDeserializationSchemaTest.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.confluent; + +import org.apache.flink.formats.avro.AvroRowDataSerializationSchema; +import org.apache.flink.formats.avro.AvroVariantDeserializationSchema; +import org.apache.flink.formats.avro.RegistryAvroSerializationSchema; +import org.apache.flink.formats.avro.RegistryWriterAvroDeserializationSchema; +import org.apache.flink.formats.avro.RowDataToAvroConverters; +import org.apache.flink.formats.avro.generated.Address; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; +import org.apache.flink.formats.avro.utils.TestDataGenerator; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.VariantType; +import org.apache.flink.types.variant.Variant; + +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.IOException; +import java.util.Random; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link AvroVariantDeserializationSchema} with Confluent Schema Registry wire format + * using {@link MockSchemaRegistryClient}. + */ +class RegistryAvroVariantDeserializationSchemaTest { + + private static final Schema ADDRESS_SCHEMA = Address.getClassSchema(); + private static final Schema ADDRESS_SCHEMA_V2 = + new Schema.Parser() + .parse( + "{\"namespace\": \"org.apache.flink.formats.avro.generated\"," + + " \"type\": \"record\"," + + " \"name\": \"Address\"," + + " \"fields\": [" + + " {\"name\": \"num\", \"type\": \"int\"}," + + " {\"name\": \"street\", \"type\": \"string\"}," + + " {\"name\": \"city\", \"type\": \"string\"}," + + " {\"name\": \"state\", \"type\": \"string\"}," + + " {\"name\": \"zip\", \"type\": \"string\"}," + + " {\"name\": \"country\", \"type\": \"string\"}" + + " ]}"); + private static final String SUBJECT = "address-value"; + private static final int SCHEMA_CACHE_CAPACITY = 1000; + + private static SchemaRegistryClient client; + private Address address; + + @BeforeAll + static void beforeClass() { + client = new MockSchemaRegistryClient(); + } + + @BeforeEach + void before() { + this.address = TestDataGenerator.generateRandomAddress(new Random()); + } + + @AfterEach + void after() throws IOException, RestClientException { + client.deleteSubject(SUBJECT); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testAvroVariantDeserialization(boolean includeSchemaMetadata) throws Exception { + DataType dataType = AvroSchemaConverter.convertToDataType(ADDRESS_SCHEMA.toString()); + RowType rowType = (RowType) dataType.getLogicalType(); + + AvroRowDataSerializationSchema serializer = getSerializationSchema(rowType, ADDRESS_SCHEMA); + serializer.open(null); + + RowData input = address2RowData(address); + byte[] serialized = serializer.serialize(input); + + AvroVariantDeserializationSchema deserializer = + getAvroVariantDeserializationSchema(includeSchemaMetadata); + deserializer.open(null); + + RowData rowData = deserializer.deserialize(serialized); + assertThat(rowData).isNotNull(); + assertThat(rowData.getArity()).isEqualTo(includeSchemaMetadata ? 2 : 1); + + Variant variant = rowData.getVariant(0); + assertThat(variant.getField("num").getInt()).isEqualTo(address.getNum()); + assertThat(variant.getField("street").getString()).isEqualTo(address.getStreet()); + assertThat(variant.getField("city").getString()).isEqualTo(address.getCity()); + assertThat(variant.getField("state").getString()).isEqualTo(address.getState()); + assertThat(variant.getField("zip").getString()).isEqualTo(address.getZip()); + + if (includeSchemaMetadata) { + assertThat(rowData.getString(1).toString()).isEqualTo(ADDRESS_SCHEMA.toString()); + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testSchemaEvolution(boolean includeSchemaMetadata) throws Exception { + DataType v1DataType = AvroSchemaConverter.convertToDataType(ADDRESS_SCHEMA.toString()); + RowType v1RowType = (RowType) v1DataType.getLogicalType(); + AvroRowDataSerializationSchema v1Serializer = + getSerializationSchema(v1RowType, ADDRESS_SCHEMA); + v1Serializer.open(null); + byte[] v1Bytes = v1Serializer.serialize(address2RowData(address)); + + DataType v2DataType = AvroSchemaConverter.convertToDataType(ADDRESS_SCHEMA_V2.toString()); + RowType v2RowType = (RowType) v2DataType.getLogicalType(); + AvroRowDataSerializationSchema v2Serializer = + getSerializationSchema(v2RowType, ADDRESS_SCHEMA_V2); + v2Serializer.open(null); + byte[] v2Bytes = v2Serializer.serialize(address2RowData(address, "US")); + + AvroVariantDeserializationSchema deserializer = + getAvroVariantDeserializationSchema(includeSchemaMetadata); + deserializer.open(null); + + RowData v1Row = deserializer.deserialize(v1Bytes); + Variant variant1 = v1Row.getVariant(0); + assertThat(variant1.getField("num").getInt()).isEqualTo(address.getNum()); + assertThat(variant1.getField("street").getString()).isEqualTo(address.getStreet()); + assertThat(variant1.getField("city").getString()).isEqualTo(address.getCity()); + assertThat(variant1.getField("state").getString()).isEqualTo(address.getState()); + assertThat(variant1.getField("zip").getString()).isEqualTo(address.getZip()); + + if (includeSchemaMetadata) { + assertThat(v1Row.getString(1).toString()).isEqualTo(ADDRESS_SCHEMA.toString()); + } + + RowData v2Row = deserializer.deserialize(v2Bytes); + Variant variant2 = v2Row.getVariant(0); + assertThat(variant2.getField("num").getInt()).isEqualTo(address.getNum()); + assertThat(variant2.getField("street").getString()).isEqualTo(address.getStreet()); + assertThat(variant2.getField("city").getString()).isEqualTo(address.getCity()); + assertThat(variant2.getField("state").getString()).isEqualTo(address.getState()); + assertThat(variant2.getField("zip").getString()).isEqualTo(address.getZip()); + assertThat(variant2.getField("country").getString()).isEqualTo("US"); + + if (includeSchemaMetadata) { + assertThat(v2Row.getString(1).toString()).isEqualTo(ADDRESS_SCHEMA_V2.toString()); + } + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + private static AvroRowDataSerializationSchema getSerializationSchema( + RowType rowType, Schema avroSchema) { + ConfluentSchemaRegistryCoder registryCoder = + new ConfluentSchemaRegistryCoder(SUBJECT, client); + return new AvroRowDataSerializationSchema( + rowType, + new RegistryAvroSerializationSchema( + GenericRecord.class, avroSchema, () -> registryCoder), + RowDataToAvroConverters.createConverter(rowType)); + } + + private static AvroVariantDeserializationSchema getAvroVariantDeserializationSchema( + boolean includeSchemaMetadata) { + ConfluentSchemaRegistryCoder registryCoder = new ConfluentSchemaRegistryCoder(client); + RowType rowType = + includeSchemaMetadata + ? RowType.of( + new VariantType(), new VarCharType(true, VarCharType.MAX_LENGTH)) + : RowType.of(new VariantType()); + return new AvroVariantDeserializationSchema( + new RegistryWriterAvroDeserializationSchema(() -> registryCoder), + includeSchemaMetadata, + SCHEMA_CACHE_CAPACITY, + InternalTypeInfo.of(rowType)); + } + + private static RowData address2RowData(Address address, String... extraFields) { + GenericRowData rowData = new GenericRowData(5 + extraFields.length); + rowData.setField(0, address.getNum()); + rowData.setField(1, new BinaryStringData(address.getStreet().toString())); + rowData.setField(2, new BinaryStringData(address.getCity().toString())); + rowData.setField(3, new BinaryStringData(address.getState().toString())); + rowData.setField(4, new BinaryStringData(address.getZip().toString())); + for (int i = 0; i < extraFields.length; i++) { + rowData.setField(5 + i, new BinaryStringData(extraFields[i])); + } + return rowData; + } +} diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroVariantFormatFactoryTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroVariantFormatFactoryTest.java new file mode 100644 index 0000000000000..1608c8c00c5f3 --- /dev/null +++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroVariantFormatFactoryTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.confluent; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.formats.avro.AvroVariantDeserializationSchema; +import org.apache.flink.formats.avro.RegistryWriterAvroDeserializationSchema; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.TestDynamicTableFactory; +import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.VariantType; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class RegistryAvroVariantFormatFactoryTest { + + private static final ResolvedSchema SCHEMA = + ResolvedSchema.of(Column.physical("data", DataTypes.VARIANT())); + + private static final String REGISTRY_URL = "http://localhost:8081"; + private static final int SCHEMA_CACHE_CAPACITY = 1000; + + @Test + void testDecodingFormat() { + final AvroVariantDeserializationSchema expectedDeser = + new AvroVariantDeserializationSchema( + new RegistryWriterAvroDeserializationSchema( + new CachedSchemaCoderProvider( + null, REGISTRY_URL, SCHEMA_CACHE_CAPACITY, null)), + false, + SCHEMA_CACHE_CAPACITY, + InternalTypeInfo.of(RowType.of(new VariantType()))); + + final DynamicTableSource actualSource = createTableSource(SCHEMA, getDefaultOptions()); + assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class); + TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock = + (TestDynamicTableFactory.DynamicTableSourceMock) actualSource; + + DeserializationSchema actualDeser = + scanSourceMock.valueFormat.createRuntimeDecoder( + ScanRuntimeProviderContext.INSTANCE, SCHEMA.toPhysicalRowDataType()); + + assertThat(actualDeser).isEqualTo(expectedDeser); + } + + @Test + void testDecodingFormatWithOptionalProperties() { + Map expectedProperties = getExpectedProperties(); + + final AvroVariantDeserializationSchema expectedDeser = + new AvroVariantDeserializationSchema( + new RegistryWriterAvroDeserializationSchema( + new CachedSchemaCoderProvider( + null, + REGISTRY_URL, + SCHEMA_CACHE_CAPACITY, + expectedProperties)), + false, + SCHEMA_CACHE_CAPACITY, + InternalTypeInfo.of(RowType.of(new VariantType()))); + + final DynamicTableSource actualSource = + createTableSource(SCHEMA, getOptionsWithProperties()); + TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock = + (TestDynamicTableFactory.DynamicTableSourceMock) actualSource; + + DeserializationSchema actualDeser = + scanSourceMock.valueFormat.createRuntimeDecoder( + ScanRuntimeProviderContext.INSTANCE, SCHEMA.toPhysicalRowDataType()); + + assertThat(actualDeser).isEqualTo(expectedDeser); + } + + @Test + void testMissingUrl() { + final Map options = getDefaultOptions(); + options.remove("avro-variant-confluent.url"); + + assertThatThrownBy(() -> createTableSource(SCHEMA, options)) + .isInstanceOf(ValidationException.class); + } + + @Test + void testDecodingFormatWithSchemaMetadata() { + final AvroVariantDeserializationSchema expectedDeser = + new AvroVariantDeserializationSchema( + new RegistryWriterAvroDeserializationSchema( + new CachedSchemaCoderProvider( + null, REGISTRY_URL, SCHEMA_CACHE_CAPACITY, null)), + true, + SCHEMA_CACHE_CAPACITY, + InternalTypeInfo.of( + RowType.of( + new VariantType(), + new VarCharType(true, VarCharType.MAX_LENGTH)))); + + final DynamicTableSource actualSource = createTableSource(SCHEMA, getDefaultOptions()); + TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock = + (TestDynamicTableFactory.DynamicTableSourceMock) actualSource; + + scanSourceMock.valueFormat.applyReadableMetadata(Collections.singletonList("schema")); + + DeserializationSchema actualDeser = + scanSourceMock.valueFormat.createRuntimeDecoder( + ScanRuntimeProviderContext.INSTANCE, SCHEMA.toPhysicalRowDataType()); + + assertThat(actualDeser).isEqualTo(expectedDeser); + } + + private Map getDefaultOptions() { + final Map options = new HashMap<>(); + options.put("connector", TestDynamicTableFactory.IDENTIFIER); + options.put("target", "MyTarget"); + options.put("buffer-size", "1000"); + + options.put("format", RegistryAvroVariantFormatFactory.IDENTIFIER); + options.put("avro-variant-confluent.url", REGISTRY_URL); + return options; + } + + private Map getOptionsWithProperties() { + final Map options = getDefaultOptions(); + String prefix = "avro-variant-confluent."; + options.put(prefix + "ssl.keystore.location", "/test-keystore.jks"); + options.put(prefix + "ssl.keystore.password", "123456"); + options.put(prefix + "ssl.truststore.location", "/test-keystore.jks"); + options.put(prefix + "ssl.truststore.password", "123456"); + options.put(prefix + "basic-auth.credentials-source", "USER_INFO"); + options.put(prefix + "basic-auth.user-info", "user:pwd"); + options.put(prefix + "properties.bearer.auth.token", "CUSTOM"); + return options; + } + + private Map getExpectedProperties() { + Map expectedProperties = new HashMap<>(); + expectedProperties.put("schema.registry.ssl.keystore.location", "/test-keystore.jks"); + expectedProperties.put("schema.registry.ssl.keystore.password", "123456"); + expectedProperties.put("schema.registry.ssl.truststore.location", "/test-keystore.jks"); + expectedProperties.put("schema.registry.ssl.truststore.password", "123456"); + expectedProperties.put("basic.auth.credentials.source", "USER_INFO"); + expectedProperties.put("basic.auth.user.info", "user:pwd"); + expectedProperties.put("bearer.auth.token", "CUSTOM"); + return expectedProperties; + } +} diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroVariantDecodingFormat.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroVariantDecodingFormat.java new file mode 100644 index 0000000000000..4a180bc078a28 --- /dev/null +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroVariantDecodingFormat.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VariantType; +import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.flink.util.Preconditions; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * {@link DecodingFormat} for Avro Variant that reads Avro records with dynamic writer schemas and + * converts them to Variant. Supports format-level metadata columns (e.g., Avro schema string). + */ +@Internal +public class AvroVariantDecodingFormat implements DecodingFormat> { + + private static final String SCHEMA_METADATA_KEY = "schema"; + private static final DataType SCHEMA_METADATA_TYPE = DataTypes.STRING(); + + private final SchemaCoder.SchemaCoderProvider schemaCoderProvider; + private final int maxCacheSize; + private List metadataKeys = Collections.emptyList(); + + public AvroVariantDecodingFormat( + SchemaCoder.SchemaCoderProvider schemaCoderProvider, int maxCacheSize) { + this.schemaCoderProvider = schemaCoderProvider; + this.maxCacheSize = maxCacheSize; + } + + @Override + public DeserializationSchema createRuntimeDecoder( + DynamicTableSource.Context context, DataType physicalDataType) { + + validatePhysicalType((RowType) physicalDataType.getLogicalType()); + + final boolean includeSchemaMetadata = metadataKeys.contains(SCHEMA_METADATA_KEY); + final DataType producedDataType; + if (includeSchemaMetadata) { + producedDataType = + DataTypeUtils.appendRowFields( + physicalDataType, + Collections.singletonList( + DataTypes.FIELD(SCHEMA_METADATA_KEY, SCHEMA_METADATA_TYPE))); + } else { + producedDataType = physicalDataType; + } + + final TypeInformation producedTypeInfo = + context.createTypeInformation(producedDataType); + + return new AvroVariantDeserializationSchema( + new RegistryWriterAvroDeserializationSchema(schemaCoderProvider), + includeSchemaMetadata, + maxCacheSize, + producedTypeInfo); + } + + private void validatePhysicalType(RowType rowType) { + Preconditions.checkArgument( + rowType.getFieldCount() == 1, + "avro-variant format requires exactly one physical column of type VARIANT, " + + "but found %s columns", + rowType.getFieldCount()); + + LogicalType fieldType = rowType.getTypeAt(0); + Preconditions.checkArgument( + fieldType instanceof VariantType, + "avro-variant format requires the physical column to be VARIANT type, " + + "but found %s", + fieldType); + } + + @Override + public Map listReadableMetadata() { + return Collections.singletonMap(SCHEMA_METADATA_KEY, SCHEMA_METADATA_TYPE); + } + + @Override + public void applyReadableMetadata(List metadataKeys) { + this.metadataKeys = metadataKeys; + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); + } +} diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroVariantDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroVariantDeserializationSchema.java new file mode 100644 index 0000000000000..2be3237d86760 --- /dev/null +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroVariantDeserializationSchema.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.AvroToVariantDataConverters.AvroToVariantDataConverter; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.types.variant.Variant; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Deserialization schema from Avro bytes to {@link RowData} with a Variant physical column and + * optional metadata columns appended at the end. Wraps a {@link + * RegistryWriterAvroDeserializationSchema} that produces {@link GenericRecord}, then converts to + * Variant and assembles the output row. + * + *

Converter cache performance will be better when combined with CachedSchemaProvider's that + * return same Schema object reference in case of matching schemas. + */ +@Internal +public class AvroVariantDeserializationSchema implements DeserializationSchema { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = + LoggerFactory.getLogger(AvroVariantDeserializationSchema.class); + + private final RegistryWriterAvroDeserializationSchema innerDeserializationSchema; + private final boolean includeSchemaMetadata; + private final int maxCacheSize; + private final TypeInformation typeInfo; + + transient Map converterCache; + + public AvroVariantDeserializationSchema( + RegistryWriterAvroDeserializationSchema innerDeserializationSchema, + boolean includeSchemaMetadata, + int maxCacheSize, + TypeInformation typeInfo) { + this.innerDeserializationSchema = Objects.requireNonNull(innerDeserializationSchema); + this.includeSchemaMetadata = includeSchemaMetadata; + this.maxCacheSize = maxCacheSize; + this.typeInfo = Objects.requireNonNull(typeInfo); + } + + @Override + public void open(InitializationContext context) throws Exception { + this.innerDeserializationSchema.open(context); + this.converterCache = + new LinkedHashMap<>(maxCacheSize) { + @Override + protected boolean removeEldestEntry( + Map.Entry eldest) { + if (size() > maxCacheSize) { + LOG.info( + "Evicting schema converter from cache, size exceeded {}", + maxCacheSize); + return true; + } + return false; + } + }; + } + + @Override + public RowData deserialize(@Nullable byte[] message) throws IOException { + if (message == null) { + return null; + } + + GenericRecord record = innerDeserializationSchema.deserialize(message); + if (record == null) { + return null; + } + + GenericRowData row = new GenericRowData(includeSchemaMetadata ? 2 : 1); + Schema writerSchema = record.getSchema(); + AvroToVariantDataConverter converter = getOrCreateConverter(writerSchema); + Variant variant = converter.convert(record); + row.setField(0, variant); + + if (includeSchemaMetadata) { + row.setField(1, StringData.fromString(writerSchema.toString())); + } + + return row; + } + + private AvroToVariantDataConverter getOrCreateConverter(Schema writerSchema) { + return converterCache.computeIfAbsent( + writerSchema, AvroToVariantDataConverters::createVariantConverter); + } + + @Override + public boolean isEndOfStream(RowData nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return typeInfo; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AvroVariantDeserializationSchema that = (AvroVariantDeserializationSchema) o; + return includeSchemaMetadata == that.includeSchemaMetadata + && maxCacheSize == that.maxCacheSize + && innerDeserializationSchema.equals(that.innerDeserializationSchema) + && typeInfo.equals(that.typeInfo); + } + + @Override + public int hashCode() { + return Objects.hash( + includeSchemaMetadata, maxCacheSize, innerDeserializationSchema, typeInfo); + } +} diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryWriterAvroDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryWriterAvroDeserializationSchema.java new file mode 100644 index 0000000000000..21e5b16475c85 --- /dev/null +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryWriterAvroDeserializationSchema.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DecoderFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Objects; + +/** + * Deserialization schema that deserializes from Avro binary format using a {@link SchemaCoder} to + * resolve the writer schema dynamically per record. Unlike {@link + * RegistryAvroDeserializationSchema} which projects records into a fixed reader schema, this class + * uses the writer schema as both reader and writer — preserving all fields as-is. + * + *

Designed for use cases where the Avro schema is not known at table creation time and varies + * per record. + */ +public class RegistryWriterAvroDeserializationSchema + implements DeserializationSchema { + + private static final long serialVersionUID = 1L; + + private final SchemaCoder.SchemaCoderProvider schemaCoderProvider; + + private transient SchemaCoder schemaCoder; + private transient MutableByteArrayInputStream inputStream; + private transient BinaryDecoder decoder; + private transient GenericDatumReader datumReader; + + public RegistryWriterAvroDeserializationSchema( + SchemaCoder.SchemaCoderProvider schemaCoderProvider) { + this.schemaCoderProvider = Objects.requireNonNull(schemaCoderProvider); + } + + @Override + public void open(InitializationContext context) throws Exception { + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + GenericData genericData = new GenericData(cl); + this.datumReader = new GenericDatumReader<>(null, null, genericData); + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + this.schemaCoder = schemaCoderProvider.get(); + } + + @Override + public GenericRecord deserialize(@Nullable byte[] message) throws IOException { + if (message == null) { + return null; + } + + inputStream.setBuffer(message); + Schema writerSchema = schemaCoder.readSchema(inputStream); + datumReader.setSchema(writerSchema); + datumReader.setExpected(writerSchema); + + return datumReader.read(null, decoder); + } + + @Override + public boolean isEndOfStream(GenericRecord nextElement) { + return false; + } + + // This is unused as this is composed in AvroVariantDeserializationSchema + // which has its own type info. + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(GenericRecord.class); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RegistryWriterAvroDeserializationSchema that = (RegistryWriterAvroDeserializationSchema) o; + return schemaCoderProvider.equals(that.schemaCoderProvider); + } + + @Override + public int hashCode() { + return Objects.hash(schemaCoderProvider); + } +} diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroVariantDeserializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroVariantDeserializationSchemaTest.java new file mode 100644 index 0000000000000..2ed3b5fbe4e8e --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroVariantDeserializationSchemaTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.connector.testutils.formats.DummyInitializationContext; +import org.apache.flink.formats.avro.generated.Address; +import org.apache.flink.formats.avro.utils.TestDataGenerator; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.VariantType; +import org.apache.flink.types.variant.Variant; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Random; + +import static org.apache.flink.formats.avro.utils.AvroTestUtils.fixedRoundRobinSchemaCoderProvider; +import static org.apache.flink.formats.avro.utils.AvroTestUtils.writeRecord; +import static org.assertj.core.api.Assertions.assertThat; + +class AvroVariantDeserializationSchemaTest { + + private static final Address ADDRESS = TestDataGenerator.generateRandomAddress(new Random()); + private static final Schema ADDRESS_SCHEMA = Address.getClassSchema(); + private static final Schema ADDRESS_SCHEMA_V2 = + new Schema.Parser() + .parse( + "{\"namespace\": \"org.apache.flink.formats.avro.generated\"," + + " \"type\": \"record\"," + + " \"name\": \"Address\"," + + " \"fields\": [" + + " {\"name\": \"num\", \"type\": \"int\"}," + + " {\"name\": \"street\", \"type\": \"string\"}," + + " {\"name\": \"city\", \"type\": \"string\"}," + + " {\"name\": \"state\", \"type\": \"string\"}," + + " {\"name\": \"zip\", \"type\": \"string\"}," + + " {\"name\": \"country\", \"type\": \"string\"}" + + " ]}"); + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testDeserialization(boolean includeSchemaMetadata) throws Exception { + RowType rowType = + includeSchemaMetadata + ? RowType.of( + new VariantType(), new VarCharType(true, VarCharType.MAX_LENGTH)) + : RowType.of(new VariantType()); + AvroVariantDeserializationSchema deserializer = + new AvroVariantDeserializationSchema( + new RegistryWriterAvroDeserializationSchema( + fixedRoundRobinSchemaCoderProvider(ADDRESS_SCHEMA)), + includeSchemaMetadata, + 10, + InternalTypeInfo.of(rowType)); + deserializer.open(new DummyInitializationContext()); + + RowData rowData = deserializer.deserialize(writeRecord(ADDRESS, ADDRESS_SCHEMA)); + assertThat(rowData.getArity()).isEqualTo(includeSchemaMetadata ? 2 : 1); + + Variant variant = rowData.getVariant(0); + assertThat(variant.getField("num").getInt()).isEqualTo(ADDRESS.getNum()); + assertThat(variant.getField("street").getString()).isEqualTo(ADDRESS.getStreet()); + assertThat(variant.getField("city").getString()).isEqualTo(ADDRESS.getCity()); + assertThat(variant.getField("state").getString()).isEqualTo(ADDRESS.getState()); + assertThat(variant.getField("zip").getString()).isEqualTo(ADDRESS.getZip()); + + if (includeSchemaMetadata) { + assertThat(rowData.getString(1).toString()).isEqualTo(ADDRESS_SCHEMA.toString()); + } + } + + @Test + void testConverterCacheEviction() throws Exception { + GenericRecord v2Record = new GenericData.Record(ADDRESS_SCHEMA_V2); + v2Record.put("num", ADDRESS.getNum()); + v2Record.put("street", ADDRESS.getStreet()); + v2Record.put("city", ADDRESS.getCity()); + v2Record.put("state", ADDRESS.getState()); + v2Record.put("zip", ADDRESS.getZip()); + v2Record.put("country", "US"); + + AvroVariantDeserializationSchema deserializer = + new AvroVariantDeserializationSchema( + new RegistryWriterAvroDeserializationSchema( + fixedRoundRobinSchemaCoderProvider( + ADDRESS_SCHEMA, ADDRESS_SCHEMA_V2)), + false, + 1, + InternalTypeInfo.of(RowType.of(new VariantType()))); + deserializer.open(new DummyInitializationContext()); + + deserializer.deserialize(writeRecord(ADDRESS, ADDRESS_SCHEMA)); + assertThat(deserializer.converterCache).hasSize(1); + assertThat(deserializer.converterCache).containsKey(ADDRESS_SCHEMA); + + // ADDRESS_SCHEMA_V2 triggers eviction of ADDRESS_SCHEMA (maxCacheSize=1) + deserializer.deserialize(writeRecord(v2Record, ADDRESS_SCHEMA_V2)); + assertThat(deserializer.converterCache).hasSize(1); + assertThat(deserializer.converterCache).doesNotContainKey(ADDRESS_SCHEMA); + assertThat(deserializer.converterCache).containsKey(ADDRESS_SCHEMA_V2); + + // ADDRESS_SCHEMA re-enters the cache, deserialization still works + deserializer.deserialize(writeRecord(ADDRESS, ADDRESS_SCHEMA)); + assertThat(deserializer.converterCache).hasSize(1); + assertThat(deserializer.converterCache).containsKey(ADDRESS_SCHEMA); + } + + @Test + void testDeserializeNull() throws Exception { + AvroVariantDeserializationSchema deserializer = + new AvroVariantDeserializationSchema( + new RegistryWriterAvroDeserializationSchema( + fixedRoundRobinSchemaCoderProvider(ADDRESS_SCHEMA)), + false, + 10, + InternalTypeInfo.of(RowType.of(new VariantType()))); + deserializer.open(new DummyInitializationContext()); + + assertThat(deserializer.deserialize(null)).isNull(); + } +} diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryWriterAvroDeserializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryWriterAvroDeserializationSchemaTest.java new file mode 100644 index 0000000000000..c00883353bebf --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryWriterAvroDeserializationSchemaTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.connector.testutils.formats.DummyInitializationContext; +import org.apache.flink.formats.avro.generated.Address; +import org.apache.flink.formats.avro.utils.TestDataGenerator; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.junit.jupiter.api.Test; + +import java.util.Random; + +import static org.apache.flink.formats.avro.utils.AvroTestUtils.fixedRoundRobinSchemaCoderProvider; +import static org.apache.flink.formats.avro.utils.AvroTestUtils.writeRecord; +import static org.assertj.core.api.Assertions.assertThat; + +class RegistryWriterAvroDeserializationSchemaTest { + + private static final Address address = TestDataGenerator.generateRandomAddress(new Random()); + private static final Schema ADDRESS_SCHEMA = Address.getClassSchema(); + + @Test + void testGenericRecordReadWithFullSchema() throws Exception { + RegistryWriterAvroDeserializationSchema deserializer = + new RegistryWriterAvroDeserializationSchema( + fixedRoundRobinSchemaCoderProvider(ADDRESS_SCHEMA)); + + deserializer.open(new DummyInitializationContext()); + + GenericRecord record = deserializer.deserialize(writeRecord(address, ADDRESS_SCHEMA)); + assertThat(record.getSchema()).isSameAs(ADDRESS_SCHEMA); + assertThat(record.get("num")).isEqualTo(address.getNum()); + assertThat(record.get("street").toString()).isEqualTo(address.getStreet()); + assertThat(record.get("city").toString()).isEqualTo(address.getCity()); + assertThat(record.get("state").toString()).isEqualTo(address.getState()); + assertThat(record.get("zip").toString()).isEqualTo(address.getZip()); + } +} diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java index b4e12f8ca10ee..ec76f65d993b6 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java @@ -21,6 +21,7 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.formats.avro.AvroFormatOptions.AvroEncoding; +import org.apache.flink.formats.avro.SchemaCoder; import org.apache.flink.formats.avro.generated.Address; import org.apache.flink.formats.avro.generated.Colors; import org.apache.flink.formats.avro.generated.Fixed16; @@ -42,6 +43,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.math.BigDecimal; import java.nio.ByteBuffer; @@ -55,6 +57,8 @@ import java.time.temporal.ChronoUnit; import java.util.Arrays; import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; /** Utilities for creating Avro Schemas. */ public final class AvroTestUtils { @@ -397,4 +401,25 @@ public static Encoder createEncoder( return EncoderFactory.get().binaryEncoder(outputStream, null); } } + + public static SchemaCoder.SchemaCoderProvider fixedRoundRobinSchemaCoderProvider( + Schema... schemas) { + return fixedRoundRobinSchemaCoderProvider(Arrays.asList(schemas)); + } + + public static SchemaCoder.SchemaCoderProvider fixedRoundRobinSchemaCoderProvider( + List schemas) { + return () -> { + AtomicInteger index = new AtomicInteger(0); + return new SchemaCoder() { + @Override + public Schema readSchema(InputStream in) { + return schemas.get(index.getAndIncrement() % schemas.size()); + } + + @Override + public void writeSchema(Schema schema, OutputStream out) throws IOException {} + }; + }; + } }