[FLINK-39605][Flink-Formats] Add avro variant confluent format#28115
[FLINK-39605][Flink-Formats] Add avro variant confluent format#28115swapna267 wants to merge 1 commit intoapache:masterfrom
Conversation
| * Variant {@link DeserializationSchema}. Deserialization only — no serialization support. | ||
| */ | ||
| @Internal | ||
| public class ConfluentRegistryAvroVariantFormatFactory implements DeserializationFormatFactory { |
There was a problem hiding this comment.
does it make sense to extend the existing format Factory so it can pick up the configuration properties and just override the the IDENTIFIER and and the method?
There was a problem hiding this comment.
This format factory will need to override all the methods except this utility method buildOptionalPropertiesMap .
Main differences to be clear,
- RegistryAvroFormatFactory implements both DeserializationFormatFactory, SerializationFormatFactory
- createDecodingFormat fetches the schema which is not the case here.
- Schema is part of RequiredOptions which is not the case here.
So extending it wouldn't make sense.
One thing i could see as improvement is , add some utilities to AvroConfluentFormatOptions to centralize all Registry Auth config building.
| @Internal | ||
| public class ConfluentRegistryAvroVariantFormatFactory implements DeserializationFormatFactory { | ||
|
|
||
| public static final String IDENTIFIER = "avro-variant-confluent"; |
There was a problem hiding this comment.
could you add docs for this please. A SQL example would be good.
I assume this will not be that useful until we have more of https://issues.apache.org/jira/browse/FLINK-37922 implemented.
There was a problem hiding this comment.
Yes planning to add a doc similar to https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/connectors/table/formats/avro-confluent/.
But wanted to get an initial review on naming/code, before the documentation.
We have some accessor methods added as part of #27330.
I am also working on Flink + Iceberg integration here apache/iceberg#15471.
This would allow exploding Variant type columns and creating and persisting data to Iceberg tables .
| * Variant {@link DeserializationSchema}. Deserialization only — no serialization support. | ||
| */ | ||
| @Internal | ||
| public class ConfluentRegistryAvroVariantFormatFactory implements DeserializationFormatFactory { |
There was a problem hiding this comment.
The Class name starts with confluent. This is not consistent with the other formats. I suggest renaming to remove the Confluent prefix, as we know from the package we are Confluent here.
| @Internal | ||
| public class ConfluentRegistryAvroVariantFormatFactory implements DeserializationFormatFactory { | ||
|
|
||
| public static final String IDENTIFIER = "avro-variant-confluent"; |
There was a problem hiding this comment.
avro-variant-confluent or avro-confluent-variant? I wonder what your thinking is? Prefixing with avro-confluent for all Avro confluent formats appeals to me.
There was a problem hiding this comment.
My idea was input-output format then registry , as we can have any pluggable internal schema registries.
Like there could be avro-variant-customschemaregistry , similar to how classes are organized.
But i am open to suggestions here.
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.flink.formats.avro; |
There was a problem hiding this comment.
A SQL level test would be good - showing use of the Confluent registry url with the new schema. And variant accessors.
There was a problem hiding this comment.
yeah thought so, but this would need a local running confluent schema registry . I didn't see such tests for avro-confluent format. Will explore more on that.
There was a problem hiding this comment.
I see we would need to,
- Have a custom connector that can pass records of varying schemas (as we don't have the Kafka local connector). I see something similar to
TestProtobufTableFactory - MockSchemaRegistry Http Server
- Include Flink Table / Streaming API dependencies into flink-formats module
Let me know if you have any other ideas here or any such existing integration tests.
As we already have test cases covering those code paths through individual tests, Do you think we need this integration test here ?
If the idea is, if this test case can act more like a documentation or reference for this use case, would adding that details to the documentation for this format in specific will help ?
|
Thanks @davidradl for the review. |
What is the purpose of the change
This pull request adds a new avro-variant-confluent deserialization format that reads Avro binary data from Confluent Schema Registry and converts it into Flink's VARIANT type. Unlike the existing avro-confluent format which requires a fixed reader schema at table creation time, this format uses the writer schema
dynamically per record — enabling schema-agnostic ingestion where the Avro schema varies across records.
Example usage:
Brief change log
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
Documentation
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code (Claude Opus 4.6)