Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions docs/content/docs/connectors/table/formats/debezium.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ Available Metadata

The following format metadata can be exposed as read-only (`VIRTUAL`) columns in a table definition.

Both `debezium-json` and `debezium-avro-confluent` formats support the same metadata fields.

<span class="label label-danger">Attention</span> Format metadata fields are only available if the
corresponding connector forwards format metadata. Currently, only the Kafka connector is able to expose
metadata fields for its value format.
Expand Down Expand Up @@ -235,6 +237,7 @@ metadata fields for its value format.
The following example shows how to access Debezium metadata fields in Kafka:

```sql
-- For debezium-json format
CREATE TABLE KafkaTable (
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
Expand All @@ -253,6 +256,27 @@ CREATE TABLE KafkaTable (
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'debezium-json'
);

-- For debezium-avro-confluent format
CREATE TABLE KafkaTable (
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'debezium-avro-confluent',
'debezium-avro-confluent.url' = 'http://localhost:8081'
);
```

Format Options
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.avro.registry.confluent.debezium;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.MetadataConverter;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.types.RowKind;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

/** {@link DecodingFormat} for Debezium using Avro encoding. */
public class DebeziumAvroDecodingFormat
implements ProjectableDecodingFormat<DeserializationSchema<RowData>> {

// ----------------------------------------------------------------------------------------
// Mutable attributes
// ----------------------------------------------------------------------------------------

private List<String> metadataKeys;

// ----------------------------------------------------------------------------------------
// Debezium-specific attributes
// ----------------------------------------------------------------------------------------

private final String schemaRegistryURL;
private final String schema;
private final Map<String, ?> optionalPropertiesMap;

public DebeziumAvroDecodingFormat(
String schemaRegistryURL, String schema, Map<String, ?> optionalPropertiesMap) {
this.schemaRegistryURL = schemaRegistryURL;
this.schema = schema;
this.optionalPropertiesMap = optionalPropertiesMap;
this.metadataKeys = Collections.emptyList();
}

@Override
public DeserializationSchema<RowData> createRuntimeDecoder(
DynamicTableSource.Context context, DataType physicalDataType, int[][] projections) {
physicalDataType = Projection.of(projections).project(physicalDataType);

final List<ReadableMetadata> readableMetadata =
metadataKeys.stream()
.map(
k ->
Stream.of(ReadableMetadata.values())
.filter(rm -> rm.key.equals(k))
.findFirst()
.orElseThrow(IllegalStateException::new))
.collect(Collectors.toList());
final List<DataTypes.Field> metadataFields =
readableMetadata.stream()
.map(m -> DataTypes.FIELD(m.key, m.dataType))
.collect(Collectors.toList());

final DataType producedDataType =
DataTypeUtils.appendRowFields(physicalDataType, metadataFields);
final TypeInformation<RowData> producedTypeInfo =
context.createTypeInformation(producedDataType);

return new DebeziumAvroDeserializationSchema(
physicalDataType,
readableMetadata,
producedTypeInfo,
schemaRegistryURL,
schema,
optionalPropertiesMap);
}

@Override
public Map<String, DataType> listReadableMetadata() {
final Map<String, DataType> metadataMap = new LinkedHashMap<>();
Stream.of(ReadableMetadata.values())
.forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
return metadataMap;
}

@Override
public void applyReadableMetadata(List<String> metadataKeys) {
this.metadataKeys = metadataKeys;
}

@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();
}

// ----------------------------------------------------------------------------------------
// Metadata handling
// ----------------------------------------------------------------------------------------

/** List of metadata that can be read with this format. */
enum ReadableMetadata {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debezium-json exposes a 7th metadata, schema (the inline Connect schema), which isn't in this enum. I'm assuming that's intentional because the Confluent format carries the schema in the registry rather than inline, so there's nothing to expose — is that the reasoning, or just out of scope for this PR?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, intentional.

The debezium-json format exposes the inline schema field because it is embedded in each JSON message. However, the Confluent Avro format stores schemas in the registry and messages only carry the schema ID. Since there is no inline schema field in the Avro payload itself, there is nothing to expose as metadata.

INGESTION_TIMESTAMP(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: this inner convert returning row looks unreachable — the wrapper in DebeziumAvroDeserializationSchema only calls m.converter.convert(...) when the resolved field is a GenericRowData, and the top-level ts_ms is a timestamp scalar, so this branch never fires. Could drop the body to a no-op/null to avoid implying it runs.

"ingestion-timestamp",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
DataTypes.FIELD("ts_ms", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
new MetadataConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object convert(GenericRowData row, int unused) {
return row;
}
}),

SOURCE_TIMESTAMP(
"source.timestamp",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
SOURCE_FIELD,
new MetadataConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object convert(GenericRowData row, int unused) {
int pos = SOURCE_PROPERTY_POSITION.get("ts_ms");
return row.getField(pos);
}
}),

SOURCE_DATABASE(
"source.database",
DataTypes.STRING().nullable(),
SOURCE_FIELD,
new MetadataConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object convert(GenericRowData row, int unused) {
int pos = SOURCE_PROPERTY_POSITION.get("db");
return row.getField(pos);
}
}),

SOURCE_SCHEMA(
"source.schema",
DataTypes.STRING().nullable(),
SOURCE_FIELD,
new MetadataConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object convert(GenericRowData row, int unused) {
int pos = SOURCE_PROPERTY_POSITION.get("schema");
return row.getField(pos);
}
}),

SOURCE_TABLE(
"source.table",
DataTypes.STRING().nullable(),
SOURCE_FIELD,
new MetadataConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object convert(GenericRowData row, int unused) {
int pos = SOURCE_PROPERTY_POSITION.get("table");
return row.getField(pos);
}
}),

SOURCE_PROPERTIES(
"source.properties",
// key and value of the map are nullable to make handling easier in queries
DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable())
.nullable(),
SOURCE_FIELD,
new MetadataConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object convert(GenericRowData row, int unused) {
Map<StringData, StringData> result = new HashMap<>();
for (int i = 0; i < SOURCE_PROPERTY_FIELDS.length; i++) {
Object value = row.getField(i);
result.put(
StringData.fromString(SOURCE_PROPERTY_FIELDS[i].getName()),
value == null ? null : StringData.fromString(value.toString()));
}
return new GenericMapData(result);
}
});

final String key;
final DataType dataType;
final DataTypes.Field requiredAvroField;
final MetadataConverter converter;

ReadableMetadata(
String key,
DataType dataType,
DataTypes.Field requiredAvroField,
MetadataConverter converter) {
this.key = key;
this.dataType = dataType;
this.requiredAvroField = requiredAvroField;
this.converter = converter;
}
}

private static final DataTypes.Field[] SOURCE_PROPERTY_FIELDS = {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debezium-json models source as a connector-agnostic MAP<STRING, STRING> (DebeziumJsonDecodingFormat's SOURCE_* entries all use DataTypes.FIELD("source", DataTypes.MAP(...))), so it reads whatever fields a given Debezium connector emits. Here source is a fixed-order ROW of 13 named fields, and several (scn, commit_scn, lcr_position) are Oracle/XStream-specific while schema is absent from MySQL's source.

Since AvroToRowDataConverters.createRowConverter reads positionally (record.get(i)), this reader ROW has to line up — via Avro name resolution — with the actual writer schema in the registry. Different connectors (MySQL: server_id/gtid/file/pos; Postgres: lsn/xmin; MongoDB: different again) emit different source shapes. How do you see this behaving when the registered schema's source doesn't match this list — does Avro resolution fill the missing fields as null, or does it reject? I'm wondering whether mirroring the JSON side's flexible map would sidestep the per-connector coupling entirely. Asking rather than asserting here — you may have a constraint I'm not seeing.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into using MAP like debezium-json, but hit a constraint:

Debezium's source is an Avro record type (not map). Flink's AvroToRowDataConverters maps Avro types to Flink types — map → MAP, record → ROW. There's no record → MAP converter.

If we define source as MAP in Flink schema, AvroSchemaConverter will generate an Avro map schema, but the registered schema has soucre as a record type. Based on Avro's schema resolution rules (record ↔ MAP not compatible), this should fail during deserialization.

To make it connector-agnostic, two options:

  1. Add record → MAP converter to AvroToRowDataConverters — extracts fields from runtime schema dynamically. Clean, but touches flink-avro core.

  2. Handle source in DebeziumAvroDeserializationSchema — keep as GenericRecord, convert to MAP using runtime schema. No core changes, but bypasses standard layer.

Which fits better with Flink's direction?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for digging into this — the record-vs-map constraint is real and correctly diagnosed. The converter switch only wires record → ROW (AvroToRowDataConverters.java:148-149) and map → MAP (:150-152), so declaring source as MAP would have AvroSchemaConverter build a valid Avro map schema that then can't resolve against the registry's record — per Avro's resolution rules a writer record and a reader map aren't compatible, so it fails at decode. Good catch.

To close the open question from my first comment (mismatched source → null or reject): it resolves by name — RegistryAvroDeserializationSchema sets both writer and reader schema on the datum reader (RegistryAvroDeserializationSchema.java:102-103), so the record comes back in reader order and the positional reads stay aligned. Resolution then splits by nullability: the connector-specific fields are nullable → withDefault(null) (AvroSchemaConverter.java:554-557), so scn/commit_scn/lcr_position come back null when a connector omits them; but version/connector/name/db/table are non-nullable → noDefault(), and a default-less reader field the writer lacks makes Avro throw and fail the whole message. So the fixed ROW's real limitation is that it can't surface fields it doesn't list (MySQL gtid, Postgres lsn) — and, latently, the non-nullable version/connector/name/db/table would hard-fail any connector whose source doesn't carry those exact names. A connector-agnostic map sidesteps both.

I'd lean to option (2). Both options stringify the source fields into a MAP<STRING,STRING> — that lossiness is inherent to record→map either way — so the deciding factor is blast radius: (1) puts an opinionated record→map coercion into shared AvroToRowDataConverters, where any Avro user declaring a MAP target over a record field would hit it implicitly, to serve a Debezium-specific need; (2) keeps it local and explicit. And the registered source schema isn't available at planning time anyway (the schema option is the optional user override, normally null; the real one resolves from the registry by ID at runtime), so a runtime handler in DebeziumAvroDeserializationSchema is its natural home.

One note if you take (2): to actually gain agnosticism it should build the map from the connector's writer source record — before it's projected onto the 13-field reader ROW — otherwise the connector-only fields are already dropped and it's just the same fixed list re-shaped as a map. The typed accessors (source.database, …) could then key into that map by name, and the positional SOURCE_PROPERTY_POSITION assumption goes away.

On scope, my read is that the current typed set is enough to land this PR for the initial parity — assuming the supported connectors are documented — with the connector-agnostic map as a follow-up; that keeps the change focused and unblocks the relational connectors now. The final scope and merge call is of course the committers' to make.

DataTypes.FIELD("version", DataTypes.STRING()),
DataTypes.FIELD("connector", DataTypes.STRING()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("ts_ms", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable()),
DataTypes.FIELD("snapshot", DataTypes.STRING().nullable()),
DataTypes.FIELD("db", DataTypes.STRING()),
DataTypes.FIELD("sequence", DataTypes.STRING().nullable()),
DataTypes.FIELD("schema", DataTypes.STRING().nullable()),
DataTypes.FIELD("table", DataTypes.STRING()),
DataTypes.FIELD("txId", DataTypes.STRING().nullable()),
DataTypes.FIELD("scn", DataTypes.STRING().nullable()),
DataTypes.FIELD("commit_scn", DataTypes.STRING().nullable()),
DataTypes.FIELD("lcr_position", DataTypes.STRING().nullable())
};

private static final Map<String, Integer> SOURCE_PROPERTY_POSITION =
IntStream.range(0, SOURCE_PROPERTY_FIELDS.length)
.boxed()
.collect(Collectors.toMap(i -> SOURCE_PROPERTY_FIELDS[i].getName(), i -> i));

private static final DataTypes.Field SOURCE_FIELD =
DataTypes.FIELD("source", DataTypes.ROW(SOURCE_PROPERTY_FIELDS));
}
Loading