-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-39605][Flink-Formats] Add avro variant confluent format #28115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,271 @@ | ||
| --- | ||
| title: Confluent Avro Variant | ||
| weight: 5 | ||
| type: docs | ||
| --- | ||
| <!-- | ||
| Licensed to the Apache Software Foundation (ASF) under one | ||
| or more contributor license agreements. See the NOTICE file | ||
| distributed with this work for additional information | ||
| regarding copyright ownership. The ASF licenses this file | ||
| to you under the Apache License, Version 2.0 (the | ||
| "License"); you may not use this file except in compliance | ||
| with the License. You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, | ||
| software distributed under the License is distributed on an | ||
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| KIND, either express or implied. See the License for the | ||
| specific language governing permissions and limitations | ||
| under the License. | ||
| --> | ||
|
|
||
| # Confluent Avro Variant Format | ||
|
|
||
| {{< label "Format: Deserialization Schema" >}} | ||
|
|
||
| The Avro Variant Schema Registry (``avro-variant-confluent``) format allows you to read records that were serialized by the ``io.confluent.kafka.serializers.KafkaAvroSerializer`` and deserialize them into Flink's ``VARIANT`` type. | ||
|
|
||
| Unlike the [Confluent Avro]({{< ref "docs/connectors/table/formats/avro-confluent" >}}) format which requires the table schema to match the Avro schema, this format reads the entire Avro record into a single ``VARIANT`` column. The writer schema is resolved dynamically per record from the configured Confluent Schema Registry based on the schema version id encoded in the record. The Avro schema does not need to be known at table creation time and can evolve across records. | ||
|
|
||
| This format is deserialization-only and does not support writing. | ||
|
|
||
| Dependencies | ||
| ------------ | ||
|
|
||
| {{< sql_download_table "avro-confluent" >}} | ||
|
|
||
| For Maven, SBT, Gradle, or other build automation tools, please also ensure that Confluent's maven repository at `https://packages.confluent.io/maven/` is configured in your project's build files. | ||
|
|
||
| How to read with Avro-Variant-Confluent format | ||
| -------------- | ||
|
|
||
| This format is particularly useful when a single Kafka topic carries multiple event types with different Avro schemas. With the standard ``avro-confluent`` format, you would need a separate table for each event type, each with a fixed schema. With ``avro-variant-confluent``, a single table can ingest all event types and you can route or filter them at query time. Also, can be useful when reading from multiple Kafka topics with topic pattern. | ||
|
|
||
| Example of a table that reads Avro records from Kafka into a VARIANT column: | ||
|
|
||
| ```sql | ||
| CREATE TABLE kafka_source ( | ||
| data VARIANT | ||
| ) WITH ( | ||
| 'connector' = 'kafka', | ||
| 'topic' = 'user_events', | ||
| 'properties.bootstrap.servers' = 'localhost:9092', | ||
| 'properties.group.id' = 'my-group', | ||
| 'format' = 'avro-variant-confluent', | ||
| 'avro-variant-confluent.url' = 'http://localhost:8081' | ||
| ); | ||
|
|
||
| -- Access variant fields using bracket notation | ||
| SELECT | ||
| data['id'] AS id, | ||
| data['name'] AS name, | ||
| data['email'] AS email | ||
| FROM kafka_source; | ||
| ``` | ||
|
|
||
| --- | ||
|
|
||
| Example of reading from multiple topics with different Avro schemas using a topic pattern: | ||
|
|
||
| ```sql | ||
| -- Read from multiple topics with different Avro schemas using a single table | ||
| CREATE TABLE all_orders ( | ||
| data VARIANT | ||
| ) WITH ( | ||
| 'connector' = 'kafka', | ||
| 'topic-pattern' = 'orders-*', | ||
| .. | ||
| 'format' = 'avro-variant-confluent', | ||
| 'avro-variant-confluent.url' = 'http://localhost:8081' | ||
| ); | ||
|
|
||
| -- Query across all order topics regardless of schema differences | ||
| SELECT | ||
| data['order_id'] AS order_id, | ||
| data['customer_id'] AS customer_id, | ||
| data['total'] AS total, | ||
| data['region'] AS region | ||
| FROM all_orders; | ||
| ``` | ||
|
|
||
| --- | ||
|
|
||
| The format also supports an optional ``schema`` metadata column that exposes the Avro writer schema string for each record: | ||
|
|
||
| ```sql | ||
| CREATE TABLE kafka_source ( | ||
| data VARIANT, | ||
| avro_schema STRING METADATA FROM 'schema' | ||
| ) WITH ( | ||
| 'connector' = 'kafka', | ||
| 'topic' = 'user_events', | ||
| 'properties.bootstrap.servers' = 'localhost:9092', | ||
| 'properties.group.id' = 'my-group', | ||
| 'format' = 'avro-variant-confluent', | ||
| 'avro-variant-confluent.url' = 'http://localhost:8081' | ||
| ); | ||
|
|
||
| -- Query fields and inspect the writer schema | ||
| SELECT | ||
| data['id'] AS id, | ||
| data['name'] AS name, | ||
| avro_schema | ||
| FROM kafka_source; | ||
| ``` | ||
|
|
||
| Format Options | ||
| ---------------- | ||
|
|
||
| <table class="table table-bordered"> | ||
| <thead> | ||
| <tr> | ||
| <th class="text-left" style="width: 25%">Option</th> | ||
| <th class="text-center" style="width: 8%">Required</th> | ||
| <th class="text-center" style="width: 8%">Forwarded</th> | ||
| <th class="text-center" style="width: 7%">Default</th> | ||
| <th class="text-center" style="width: 10%">Type</th> | ||
| <th class="text-center" style="width: 42%">Description</th> | ||
| </tr> | ||
| </thead> | ||
| <tbody> | ||
| <tr> | ||
| <td><h5>format</h5></td> | ||
| <td>required</td> | ||
| <td>no</td> | ||
| <td style="word-wrap: break-word;">(none)</td> | ||
| <td>String</td> | ||
| <td>Specify what format to use, here should be <code>'avro-variant-confluent'</code>.</td> | ||
| </tr> | ||
| <tr> | ||
| <td><h5>avro-variant-confluent.url</h5></td> | ||
| <td>required</td> | ||
| <td>yes</td> | ||
| <td style="word-wrap: break-word;">(none)</td> | ||
| <td>String</td> | ||
| <td>The URL of the Confluent Schema Registry to fetch schemas.</td> | ||
| </tr> | ||
| <tr> | ||
| <td><h5>avro-variant-confluent.properties</h5></td> | ||
| <td>optional</td> | ||
| <td>yes</td> | ||
| <td style="word-wrap: break-word;">(none)</td> | ||
| <td>Map</td> | ||
| <td>Properties map that is forwarded to the underlying Schema Registry. This is useful for options that are not officially exposed via Flink config options. However, note that Flink options have higher precedence.</td> | ||
| </tr> | ||
| <tr> | ||
| <td><h5>avro-variant-confluent.ssl.keystore.location</h5></td> | ||
| <td>optional</td> | ||
| <td>yes</td> | ||
| <td style="word-wrap: break-word;">(none)</td> | ||
| <td>String</td> | ||
| <td>Location / File of SSL keystore</td> | ||
| </tr> | ||
| <tr> | ||
| <td><h5>avro-variant-confluent.ssl.keystore.password</h5></td> | ||
| <td>optional</td> | ||
| <td>yes</td> | ||
| <td style="word-wrap: break-word;">(none)</td> | ||
| <td>String</td> | ||
| <td>Password for SSL keystore</td> | ||
| </tr> | ||
| <tr> | ||
| <td><h5>avro-variant-confluent.ssl.truststore.location</h5></td> | ||
| <td>optional</td> | ||
| <td>yes</td> | ||
| <td style="word-wrap: break-word;">(none)</td> | ||
| <td>String</td> | ||
| <td>Location / File of SSL truststore</td> | ||
| </tr> | ||
| <tr> | ||
| <td><h5>avro-variant-confluent.ssl.truststore.password</h5></td> | ||
| <td>optional</td> | ||
| <td>yes</td> | ||
| <td style="word-wrap: break-word;">(none)</td> | ||
| <td>String</td> | ||
| <td>Password for SSL truststore</td> | ||
| </tr> | ||
| <tr> | ||
| <td><h5>avro-variant-confluent.basic-auth.credentials-source</h5></td> | ||
| <td>optional</td> | ||
| <td>yes</td> | ||
| <td style="word-wrap: break-word;">(none)</td> | ||
| <td>String</td> | ||
| <td>Basic auth credentials source for Schema Registry</td> | ||
| </tr> | ||
| <tr> | ||
| <td><h5>avro-variant-confluent.basic-auth.user-info</h5></td> | ||
| <td>optional</td> | ||
| <td>yes</td> | ||
| <td style="word-wrap: break-word;">(none)</td> | ||
| <td>String</td> | ||
| <td>Basic auth user info for Schema Registry</td> | ||
| </tr> | ||
| <tr> | ||
| <td><h5>avro-variant-confluent.bearer-auth.credentials-source</h5></td> | ||
| <td>optional</td> | ||
| <td>yes</td> | ||
| <td style="word-wrap: break-word;">(none)</td> | ||
| <td>String</td> | ||
| <td>Bearer auth credentials source for Schema Registry</td> | ||
| </tr> | ||
| <tr> | ||
| <td><h5>avro-variant-confluent.bearer-auth.token</h5></td> | ||
| <td>optional</td> | ||
| <td>yes</td> | ||
| <td style="word-wrap: break-word;">(none)</td> | ||
| <td>String</td> | ||
| <td>Bearer auth token for Schema Registry</td> | ||
| </tr> | ||
| </tbody> | ||
| </table> | ||
|
|
||
| Metadata | ||
| -------- | ||
|
|
||
| The format supports the following metadata column: | ||
|
|
||
| <table class="table table-bordered"> | ||
| <thead> | ||
| <tr> | ||
| <th class="text-left" style="width: 25%">Key</th> | ||
| <th class="text-center" style="width: 20%">Data Type</th> | ||
| <th class="text-center" style="width: 55%">Description</th> | ||
| </tr> | ||
| </thead> | ||
| <tbody> | ||
| <tr> | ||
| <td><code>schema</code></td> | ||
| <td><code>STRING NOT NULL</code></td> | ||
| <td>The Avro writer schema string (JSON format) used to serialize the record.</td> | ||
| </tr> | ||
| </tbody> | ||
| </table> | ||
|
|
||
| Data Type Mapping | ||
| ---------------- | ||
|
|
||
| All Avro types are converted to Flink's VARIANT type. The following Avro logical types receive special handling: | ||
|
|
||
| | Avro Type | Avro Logical Type | Variant Representation | | ||
| |-----------|-------------------|----------------------| | ||
| | INT | date | DATE | | ||
| | INT | time-millis | LONG (microseconds) | | ||
| | LONG | timestamp-millis | TIMESTAMP | | ||
| | LONG | timestamp-micros | TIMESTAMP | | ||
| | BYTES/FIXED | decimal | DECIMAL | | ||
| | RECORD | - | OBJECT | | ||
| | ARRAY | - | ARRAY | | ||
| | MAP | - | OBJECT (string keys) | | ||
| | UNION (null + type) | - | nullable inner type | | ||
| | STRING, ENUM | - | STRING | | ||
| | BOOLEAN | - | BOOLEAN | | ||
| | INT | - | INT | | ||
| | LONG | - | LONG | | ||
| | FLOAT | - | FLOAT | | ||
| | DOUBLE | - | DOUBLE | | ||
| | BYTES, FIXED | - | BYTES | | ||
| | NULL | - | NULL | | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,115 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.flink.formats.avro.registry.confluent; | ||
|
|
||
| import org.apache.flink.annotation.Internal; | ||
| import org.apache.flink.api.common.serialization.DeserializationSchema; | ||
| import org.apache.flink.configuration.ConfigOption; | ||
| import org.apache.flink.configuration.ReadableConfig; | ||
| import org.apache.flink.formats.avro.AvroVariantDecodingFormat; | ||
| import org.apache.flink.table.connector.format.DecodingFormat; | ||
| import org.apache.flink.table.data.RowData; | ||
| import org.apache.flink.table.factories.DeserializationFormatFactory; | ||
| import org.apache.flink.table.factories.DynamicTableFactory; | ||
| import org.apache.flink.table.factories.FactoryUtil; | ||
|
|
||
| import java.util.HashSet; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
|
|
||
| import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BASIC_AUTH_CREDENTIALS_SOURCE; | ||
| import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BASIC_AUTH_USER_INFO; | ||
| import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BEARER_AUTH_CREDENTIALS_SOURCE; | ||
| import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BEARER_AUTH_TOKEN; | ||
| import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.PROPERTIES; | ||
| import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SSL_KEYSTORE_LOCATION; | ||
| import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SSL_KEYSTORE_PASSWORD; | ||
| import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SSL_TRUSTSTORE_LOCATION; | ||
| import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SSL_TRUSTSTORE_PASSWORD; | ||
| import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.URL; | ||
|
|
||
| /** | ||
| * Table format factory for providing configured instances of Confluent Schema Registry Avro to | ||
| * Variant {@link DeserializationSchema}. Deserialization only — no serialization support. | ||
| */ | ||
| @Internal | ||
| public class RegistryAvroVariantFormatFactory implements DeserializationFormatFactory { | ||
|
|
||
| public static final String IDENTIFIER = "avro-variant-confluent"; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. avro-variant-confluent or avro-confluent-variant? I wonder what your thinking is? Prefixing with avro-confluent for all Avro confluent formats appeals to me.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My idea was input-output format then registry , as we can have any pluggable internal schema registries. But i am open to suggestions here. |
||
|
|
||
| private static final int SCHEMA_CACHE_CAPACITY = 1000; | ||
|
|
||
| @Override | ||
| public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat( | ||
| DynamicTableFactory.Context context, ReadableConfig formatOptions) { | ||
| FactoryUtil.validateFactoryOptions(this, formatOptions); | ||
|
|
||
| String schemaRegistryURL = formatOptions.get(URL); | ||
| Map<String, String> optionalPropertiesMap = | ||
| RegistryAvroFormatFactory.buildOptionalPropertiesMap(formatOptions); | ||
|
|
||
| return new AvroVariantDecodingFormat( | ||
| new CachedSchemaCoderProvider( | ||
| null, schemaRegistryURL, SCHEMA_CACHE_CAPACITY, optionalPropertiesMap), | ||
| SCHEMA_CACHE_CAPACITY); | ||
| } | ||
|
|
||
| @Override | ||
| public String factoryIdentifier() { | ||
| return IDENTIFIER; | ||
| } | ||
|
|
||
| @Override | ||
| public Set<ConfigOption<?>> requiredOptions() { | ||
| Set<ConfigOption<?>> options = new HashSet<>(); | ||
| options.add(URL); | ||
| return options; | ||
| } | ||
|
|
||
| @Override | ||
| public Set<ConfigOption<?>> optionalOptions() { | ||
| Set<ConfigOption<?>> options = new HashSet<>(); | ||
| options.add(PROPERTIES); | ||
| options.add(SSL_KEYSTORE_LOCATION); | ||
| options.add(SSL_KEYSTORE_PASSWORD); | ||
| options.add(SSL_TRUSTSTORE_LOCATION); | ||
| options.add(SSL_TRUSTSTORE_PASSWORD); | ||
| options.add(BASIC_AUTH_CREDENTIALS_SOURCE); | ||
| options.add(BASIC_AUTH_USER_INFO); | ||
| options.add(BEARER_AUTH_CREDENTIALS_SOURCE); | ||
| options.add(BEARER_AUTH_TOKEN); | ||
| return options; | ||
| } | ||
|
|
||
| @Override | ||
| public Set<ConfigOption<?>> forwardOptions() { | ||
| Set<ConfigOption<?>> options = new HashSet<>(); | ||
| options.add(URL); | ||
| options.add(PROPERTIES); | ||
| options.add(SSL_KEYSTORE_LOCATION); | ||
| options.add(SSL_KEYSTORE_PASSWORD); | ||
| options.add(SSL_TRUSTSTORE_LOCATION); | ||
| options.add(SSL_TRUSTSTORE_PASSWORD); | ||
| options.add(BASIC_AUTH_CREDENTIALS_SOURCE); | ||
| options.add(BASIC_AUTH_USER_INFO); | ||
| options.add(BEARER_AUTH_CREDENTIALS_SOURCE); | ||
| options.add(BEARER_AUTH_TOKEN); | ||
| return options; | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you add docs for this please. A SQL example would be good.
I assume this will not be that useful until we have more of https://issues.apache.org/jira/browse/FLINK-37922 implemented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes planning to add a doc similar to https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/connectors/table/formats/avro-confluent/.
But wanted to get an initial review on naming/code, before the documentation.
We have some accessor methods added as part of #27330.
I am also working on Flink + Iceberg integration here apache/iceberg#15471.
This would allow exploding Variant type columns and creating and persisting data to Iceberg tables .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added doc in 440ea57