Skip to content

[FLINK-39605][Flink-Formats] Add avro variant confluent format#28115

Open
swapna267 wants to merge 1 commit intoapache:masterfrom
swapna267:FLINK-39605
Open

[FLINK-39605][Flink-Formats] Add avro variant confluent format#28115
swapna267 wants to merge 1 commit intoapache:masterfrom
swapna267:FLINK-39605

Conversation

@swapna267
Copy link
Copy Markdown
Contributor

What is the purpose of the change

This pull request adds a new avro-variant-confluent deserialization format that reads Avro binary data from Confluent Schema Registry and converts it into Flink's VARIANT type. Unlike the existing avro-confluent format which requires a fixed reader schema at table creation time, this format uses the writer schema
dynamically per record — enabling schema-agnostic ingestion where the Avro schema varies across records.

Example usage:

    CREATE TABLE kafka_source (                                                                                                                                                                                                                                                                                                
      data VARIANT,                                                                                                                                                                                                                                                                                                          
      avro_schema STRING METADATA FROM 'schema'                                                                                                                                                                                                                                                                              
  ) WITH (                                                                                                                                                                                                                                                                                                                   
      'connector' = 'kafka',                                                                                                                                                                                                                                                                                                 
      'topic' = 'my-topic',                                                                                                                                                                                                                                                                                                  
      'properties.bootstrap.servers' = 'server1',                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     
      'format' = 'avro-variant-confluent',                                                                                                                                                                                                                                                                                   
      'avro-variant-confluent.url' = ''                                                                                                                                                                                                                                                                 
  );                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          

Brief change log

  • Added RegistryWriterAvroDeserializationSchema that deserializes Avro binary using the writer schema as both reader and writer, preserving all fields as-is
  • Added AvroVariantDeserializationSchema that converts GenericRecord to Variant with an LRU converter cache keyed by writer schema
  • Added AvroVariantDecodingFormat with support for optional schema metadata column exposing the writer schema string
  • Added ConfluentRegistryAvroVariantFormatFactory as the SPI entry point (identifier: avro-variant-confluent), deserialization only
  • Added test utility fixedRoundRobinSchemaCoderProvider for testing multi-schema scenarios

Verifying this change

This change added tests and can be verified as follows:

  • AvroVariantDeserializationSchemaTest — parameterized deserialization with/without schema metadata, converter cache eviction, null message handling
  • RegistryWriterAvroDeserializationSchemaTest — verifies writer-schema-preserving deserialization of GenericRecord
  • ConfluentRegistryAvroVariantFormatFactoryTest — SPI discovery, optional SSL/auth properties, missing URL validation, schema metadata column via format readable metadata
  • RegistryAvroVariantDeserializationSchemaTest — end-to-end serialize (RowData→Avro) then deserialize (Avro→Variant) with MockSchemaRegistryClient, schema evolution across v1/v2, parameterized with/without schema metadata

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes (new deserialization path for VARIANT type)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs , Will add to Flink docs similar to avro-confluent format page, once this PR is reviewed and ready.

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Generated-by: Claude Code (Claude Opus 4.6)

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented May 5, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

* Variant {@link DeserializationSchema}. Deserialization only — no serialization support.
*/
@Internal
public class ConfluentRegistryAvroVariantFormatFactory implements DeserializationFormatFactory {
Copy link
Copy Markdown
Contributor

@davidradl davidradl May 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it make sense to extend the existing format Factory so it can pick up the configuration properties and just override the the IDENTIFIER and and the method?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This format factory will need to override all the methods except this utility method buildOptionalPropertiesMap .

Main differences to be clear,

  1. RegistryAvroFormatFactory implements both DeserializationFormatFactory, SerializationFormatFactory
  2. createDecodingFormat fetches the schema which is not the case here.
  3. Schema is part of RequiredOptions which is not the case here.

So extending it wouldn't make sense.

One thing i could see as improvement is , add some utilities to AvroConfluentFormatOptions to centralize all Registry Auth config building.

@Internal
public class ConfluentRegistryAvroVariantFormatFactory implements DeserializationFormatFactory {

public static final String IDENTIFIER = "avro-variant-confluent";
Copy link
Copy Markdown
Contributor

@davidradl davidradl May 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add docs for this please. A SQL example would be good.
I assume this will not be that useful until we have more of https://issues.apache.org/jira/browse/FLINK-37922 implemented.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes planning to add a doc similar to https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/connectors/table/formats/avro-confluent/.
But wanted to get an initial review on naming/code, before the documentation.

We have some accessor methods added as part of #27330.

I am also working on Flink + Iceberg integration here apache/iceberg#15471.
This would allow exploding Variant type columns and creating and persisting data to Iceberg tables .

* Variant {@link DeserializationSchema}. Deserialization only — no serialization support.
*/
@Internal
public class ConfluentRegistryAvroVariantFormatFactory implements DeserializationFormatFactory {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Class name starts with confluent. This is not consistent with the other formats. I suggest renaming to remove the Confluent prefix, as we know from the package we are Confluent here.

@Internal
public class ConfluentRegistryAvroVariantFormatFactory implements DeserializationFormatFactory {

public static final String IDENTIFIER = "avro-variant-confluent";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avro-variant-confluent or avro-confluent-variant? I wonder what your thinking is? Prefixing with avro-confluent for all Avro confluent formats appeals to me.

Copy link
Copy Markdown
Contributor Author

@swapna267 swapna267 May 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea was input-output format then registry , as we can have any pluggable internal schema registries.
Like there could be avro-variant-customschemaregistry , similar to how classes are organized.

But i am open to suggestions here.

* limitations under the License.
*/

package org.apache.flink.formats.avro;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A SQL level test would be good - showing use of the Confluent registry url with the new schema. And variant accessors.

Copy link
Copy Markdown
Contributor Author

@swapna267 swapna267 May 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah thought so, but this would need a local running confluent schema registry . I didn't see such tests for avro-confluent format. Will explore more on that.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we would need to,

  1. Have a custom connector that can pass records of varying schemas (as we don't have the Kafka local connector). I see something similar to TestProtobufTableFactory
  2. MockSchemaRegistry Http Server
  3. Include Flink Table / Streaming API dependencies into flink-formats module

Let me know if you have any other ideas here or any such existing integration tests.
As we already have test cases covering those code paths through individual tests, Do you think we need this integration test here ?

If the idea is, if this test case can act more like a documentation or reference for this use case, would adding that details to the documentation for this format in specific will help ?

@swapna267
Copy link
Copy Markdown
Contributor Author

Thanks @davidradl for the review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants