Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 271 additions & 0 deletions docs/content/docs/connectors/table/formats/avro-variant-confluent.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
---
title: Confluent Avro Variant
weight: 5
type: docs
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Confluent Avro Variant Format

{{< label "Format: Deserialization Schema" >}}

The Avro Variant Schema Registry (``avro-variant-confluent``) format allows you to read records that were serialized by the ``io.confluent.kafka.serializers.KafkaAvroSerializer`` and deserialize them into Flink's ``VARIANT`` type.

Unlike the [Confluent Avro]({{< ref "docs/connectors/table/formats/avro-confluent" >}}) format which requires the table schema to match the Avro schema, this format reads the entire Avro record into a single ``VARIANT`` column. The writer schema is resolved dynamically per record from the configured Confluent Schema Registry based on the schema version id encoded in the record. The Avro schema does not need to be known at table creation time and can evolve across records.

This format is deserialization-only and does not support writing.

Dependencies
------------

{{< sql_download_table "avro-confluent" >}}

For Maven, SBT, Gradle, or other build automation tools, please also ensure that Confluent's maven repository at `https://packages.confluent.io/maven/` is configured in your project's build files.

How to read with Avro-Variant-Confluent format
--------------

This format is particularly useful when a single Kafka topic carries multiple event types with different Avro schemas. With the standard ``avro-confluent`` format, you would need a separate table for each event type, each with a fixed schema. With ``avro-variant-confluent``, a single table can ingest all event types and you can route or filter them at query time. Also, can be useful when reading from multiple Kafka topics with topic pattern.

Example of a table that reads Avro records from Kafka into a VARIANT column:

```sql
CREATE TABLE kafka_source (
data VARIANT
) WITH (
'connector' = 'kafka',
'topic' = 'user_events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'my-group',
'format' = 'avro-variant-confluent',
'avro-variant-confluent.url' = 'http://localhost:8081'
);

-- Access variant fields using bracket notation
SELECT
data['id'] AS id,
data['name'] AS name,
data['email'] AS email
FROM kafka_source;
```

---

Example of reading from multiple topics with different Avro schemas using a topic pattern:

```sql
-- Read from multiple topics with different Avro schemas using a single table
CREATE TABLE all_orders (
data VARIANT
) WITH (
'connector' = 'kafka',
'topic-pattern' = 'orders-*',
..
'format' = 'avro-variant-confluent',
'avro-variant-confluent.url' = 'http://localhost:8081'
);

-- Query across all order topics regardless of schema differences
SELECT
data['order_id'] AS order_id,
data['customer_id'] AS customer_id,
data['total'] AS total,
data['region'] AS region
FROM all_orders;
```

---

The format also supports an optional ``schema`` metadata column that exposes the Avro writer schema string for each record:

```sql
CREATE TABLE kafka_source (
data VARIANT,
avro_schema STRING METADATA FROM 'schema'
) WITH (
'connector' = 'kafka',
'topic' = 'user_events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'my-group',
'format' = 'avro-variant-confluent',
'avro-variant-confluent.url' = 'http://localhost:8081'
);

-- Query fields and inspect the writer schema
SELECT
data['id'] AS id,
data['name'] AS name,
avro_schema
FROM kafka_source;
```

Format Options
----------------

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 25%">Option</th>
<th class="text-center" style="width: 8%">Required</th>
<th class="text-center" style="width: 8%">Forwarded</th>
<th class="text-center" style="width: 7%">Default</th>
<th class="text-center" style="width: 10%">Type</th>
<th class="text-center" style="width: 42%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>format</h5></td>
<td>required</td>
<td>no</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specify what format to use, here should be <code>'avro-variant-confluent'</code>.</td>
</tr>
<tr>
<td><h5>avro-variant-confluent.url</h5></td>
<td>required</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The URL of the Confluent Schema Registry to fetch schemas.</td>
</tr>
<tr>
<td><h5>avro-variant-confluent.properties</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Map</td>
<td>Properties map that is forwarded to the underlying Schema Registry. This is useful for options that are not officially exposed via Flink config options. However, note that Flink options have higher precedence.</td>
</tr>
<tr>
<td><h5>avro-variant-confluent.ssl.keystore.location</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Location / File of SSL keystore</td>
</tr>
<tr>
<td><h5>avro-variant-confluent.ssl.keystore.password</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Password for SSL keystore</td>
</tr>
<tr>
<td><h5>avro-variant-confluent.ssl.truststore.location</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Location / File of SSL truststore</td>
</tr>
<tr>
<td><h5>avro-variant-confluent.ssl.truststore.password</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Password for SSL truststore</td>
</tr>
<tr>
<td><h5>avro-variant-confluent.basic-auth.credentials-source</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Basic auth credentials source for Schema Registry</td>
</tr>
<tr>
<td><h5>avro-variant-confluent.basic-auth.user-info</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Basic auth user info for Schema Registry</td>
</tr>
<tr>
<td><h5>avro-variant-confluent.bearer-auth.credentials-source</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Bearer auth credentials source for Schema Registry</td>
</tr>
<tr>
<td><h5>avro-variant-confluent.bearer-auth.token</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Bearer auth token for Schema Registry</td>
</tr>
</tbody>
</table>

Metadata
--------

The format supports the following metadata column:

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 25%">Key</th>
<th class="text-center" style="width: 20%">Data Type</th>
<th class="text-center" style="width: 55%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>schema</code></td>
<td><code>STRING NOT NULL</code></td>
<td>The Avro writer schema string (JSON format) used to serialize the record.</td>
</tr>
</tbody>
</table>

Data Type Mapping
----------------

All Avro types are converted to Flink's VARIANT type. The following Avro logical types receive special handling:

| Avro Type | Avro Logical Type | Variant Representation |
|-----------|-------------------|----------------------|
| INT | date | DATE |
| INT | time-millis | LONG (microseconds) |
| LONG | timestamp-millis | TIMESTAMP |
| LONG | timestamp-micros | TIMESTAMP |
| BYTES/FIXED | decimal | DECIMAL |
| RECORD | - | OBJECT |
| ARRAY | - | ARRAY |
| MAP | - | OBJECT (string keys) |
| UNION (null + type) | - | nullable inner type |
| STRING, ENUM | - | STRING |
| BOOLEAN | - | BOOLEAN |
| INT | - | INT |
| LONG | - | LONG |
| FLOAT | - | FLOAT |
| DOUBLE | - | DOUBLE |
| BYTES, FIXED | - | BYTES |
| NULL | - | NULL |

Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.avro.registry.confluent;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.formats.avro.AvroVariantDecodingFormat;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BASIC_AUTH_CREDENTIALS_SOURCE;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BASIC_AUTH_USER_INFO;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BEARER_AUTH_CREDENTIALS_SOURCE;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BEARER_AUTH_TOKEN;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.PROPERTIES;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SSL_KEYSTORE_LOCATION;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SSL_KEYSTORE_PASSWORD;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SSL_TRUSTSTORE_LOCATION;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SSL_TRUSTSTORE_PASSWORD;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.URL;

/**
* Table format factory for providing configured instances of Confluent Schema Registry Avro to
* Variant {@link DeserializationSchema}. Deserialization only — no serialization support.
*/
@Internal
public class RegistryAvroVariantFormatFactory implements DeserializationFormatFactory {

public static final String IDENTIFIER = "avro-variant-confluent";
Copy link
Copy Markdown
Contributor

@davidradl davidradl May 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add docs for this please. A SQL example would be good.
I assume this will not be that useful until we have more of https://issues.apache.org/jira/browse/FLINK-37922 implemented.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes planning to add a doc similar to https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/connectors/table/formats/avro-confluent/.
But wanted to get an initial review on naming/code, before the documentation.

We have some accessor methods added as part of #27330.

I am also working on Flink + Iceberg integration here apache/iceberg#15471.
This would allow exploding Variant type columns and creating and persisting data to Iceberg tables .

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added doc in 440ea57

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avro-variant-confluent or avro-confluent-variant? I wonder what your thinking is? Prefixing with avro-confluent for all Avro confluent formats appeals to me.

Copy link
Copy Markdown
Contributor Author

@swapna267 swapna267 May 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea was input-output format then registry , as we can have any pluggable internal schema registries.
Like there could be avro-variant-customschemaregistry , similar to how classes are organized.

But i am open to suggestions here.


private static final int SCHEMA_CACHE_CAPACITY = 1000;

@Override
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
DynamicTableFactory.Context context, ReadableConfig formatOptions) {
FactoryUtil.validateFactoryOptions(this, formatOptions);

String schemaRegistryURL = formatOptions.get(URL);
Map<String, String> optionalPropertiesMap =
RegistryAvroFormatFactory.buildOptionalPropertiesMap(formatOptions);

return new AvroVariantDecodingFormat(
new CachedSchemaCoderProvider(
null, schemaRegistryURL, SCHEMA_CACHE_CAPACITY, optionalPropertiesMap),
SCHEMA_CACHE_CAPACITY);
}

@Override
public String factoryIdentifier() {
return IDENTIFIER;
}

@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(URL);
return options;
}

@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(PROPERTIES);
options.add(SSL_KEYSTORE_LOCATION);
options.add(SSL_KEYSTORE_PASSWORD);
options.add(SSL_TRUSTSTORE_LOCATION);
options.add(SSL_TRUSTSTORE_PASSWORD);
options.add(BASIC_AUTH_CREDENTIALS_SOURCE);
options.add(BASIC_AUTH_USER_INFO);
options.add(BEARER_AUTH_CREDENTIALS_SOURCE);
options.add(BEARER_AUTH_TOKEN);
return options;
}

@Override
public Set<ConfigOption<?>> forwardOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(URL);
options.add(PROPERTIES);
options.add(SSL_KEYSTORE_LOCATION);
options.add(SSL_KEYSTORE_PASSWORD);
options.add(SSL_TRUSTSTORE_LOCATION);
options.add(SSL_TRUSTSTORE_PASSWORD);
options.add(BASIC_AUTH_CREDENTIALS_SOURCE);
options.add(BASIC_AUTH_USER_INFO);
options.add(BEARER_AUTH_CREDENTIALS_SOURCE);
options.add(BEARER_AUTH_TOKEN);
return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@

org.apache.flink.formats.avro.registry.confluent.RegistryAvroFormatFactory
org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroFormatFactory
org.apache.flink.formats.avro.registry.confluent.RegistryAvroVariantFormatFactory
Loading