diff --git a/docs/content/docs/connectors/table/formats/debezium.md b/docs/content/docs/connectors/table/formats/debezium.md
index 9e2f60f23a6ab..79e23f849eb1e 100644
--- a/docs/content/docs/connectors/table/formats/debezium.md
+++ b/docs/content/docs/connectors/table/formats/debezium.md
@@ -175,6 +175,8 @@ Available Metadata
The following format metadata can be exposed as read-only (`VIRTUAL`) columns in a table definition.
+Both `debezium-json` and `debezium-avro-confluent` formats support the same metadata fields.
+
Attention Format metadata fields are only available if the
corresponding connector forwards format metadata. Currently, only the Kafka connector is able to expose
metadata fields for its value format.
@@ -235,6 +237,7 @@ metadata fields for its value format.
The following example shows how to access Debezium metadata fields in Kafka:
```sql
+-- For debezium-json format
CREATE TABLE KafkaTable (
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
@@ -253,6 +256,27 @@ CREATE TABLE KafkaTable (
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'debezium-json'
);
+
+-- For debezium-avro-confluent format
+CREATE TABLE KafkaTable (
+ origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
+ event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
+ origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
+ origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
+ origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
+ origin_properties MAP METADATA FROM 'value.source.properties' VIRTUAL,
+ user_id BIGINT,
+ item_id BIGINT,
+ behavior STRING
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'user_behavior',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'properties.group.id' = 'testGroup',
+ 'scan.startup.mode' = 'earliest-offset',
+ 'value.format' = 'debezium-avro-confluent',
+ 'debezium-avro-confluent.url' = 'http://localhost:8081'
+);
```
Format Options
diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDecodingFormat.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDecodingFormat.java
new file mode 100644
index 0000000000000..032f2fcca3f28
--- /dev/null
+++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDecodingFormat.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent.debezium;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.MetadataConverter;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/** {@link DecodingFormat} for Debezium using Avro encoding. */
+public class DebeziumAvroDecodingFormat
+ implements ProjectableDecodingFormat> {
+
+ // ----------------------------------------------------------------------------------------
+ // Mutable attributes
+ // ----------------------------------------------------------------------------------------
+
+ private List metadataKeys;
+
+ // ----------------------------------------------------------------------------------------
+ // Debezium-specific attributes
+ // ----------------------------------------------------------------------------------------
+
+ private final String schemaRegistryURL;
+ private final String schema;
+ private final Map optionalPropertiesMap;
+
+ public DebeziumAvroDecodingFormat(
+ String schemaRegistryURL, String schema, Map optionalPropertiesMap) {
+ this.schemaRegistryURL = schemaRegistryURL;
+ this.schema = schema;
+ this.optionalPropertiesMap = optionalPropertiesMap;
+ this.metadataKeys = Collections.emptyList();
+ }
+
+ @Override
+ public DeserializationSchema createRuntimeDecoder(
+ DynamicTableSource.Context context, DataType physicalDataType, int[][] projections) {
+ physicalDataType = Projection.of(projections).project(physicalDataType);
+
+ final List readableMetadata =
+ metadataKeys.stream()
+ .map(
+ k ->
+ Stream.of(ReadableMetadata.values())
+ .filter(rm -> rm.key.equals(k))
+ .findFirst()
+ .orElseThrow(IllegalStateException::new))
+ .collect(Collectors.toList());
+ final List metadataFields =
+ readableMetadata.stream()
+ .map(m -> DataTypes.FIELD(m.key, m.dataType))
+ .collect(Collectors.toList());
+
+ final DataType producedDataType =
+ DataTypeUtils.appendRowFields(physicalDataType, metadataFields);
+ final TypeInformation producedTypeInfo =
+ context.createTypeInformation(producedDataType);
+
+ return new DebeziumAvroDeserializationSchema(
+ physicalDataType,
+ readableMetadata,
+ producedTypeInfo,
+ schemaRegistryURL,
+ schema,
+ optionalPropertiesMap);
+ }
+
+ @Override
+ public Map listReadableMetadata() {
+ final Map metadataMap = new LinkedHashMap<>();
+ Stream.of(ReadableMetadata.values())
+ .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
+ return metadataMap;
+ }
+
+ @Override
+ public void applyReadableMetadata(List metadataKeys) {
+ this.metadataKeys = metadataKeys;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.UPDATE_BEFORE)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .addContainedKind(RowKind.DELETE)
+ .build();
+ }
+
+ // ----------------------------------------------------------------------------------------
+ // Metadata handling
+ // ----------------------------------------------------------------------------------------
+
+ /** List of metadata that can be read with this format. */
+ enum ReadableMetadata {
+ INGESTION_TIMESTAMP(
+ "ingestion-timestamp",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+ DataTypes.FIELD("ts_ms", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int unused) {
+ return row;
+ }
+ }),
+
+ SOURCE_TIMESTAMP(
+ "source.timestamp",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+ SOURCE_FIELD,
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int unused) {
+ int pos = SOURCE_PROPERTY_POSITION.get("ts_ms");
+ return row.getField(pos);
+ }
+ }),
+
+ SOURCE_DATABASE(
+ "source.database",
+ DataTypes.STRING().nullable(),
+ SOURCE_FIELD,
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int unused) {
+ int pos = SOURCE_PROPERTY_POSITION.get("db");
+ return row.getField(pos);
+ }
+ }),
+
+ SOURCE_SCHEMA(
+ "source.schema",
+ DataTypes.STRING().nullable(),
+ SOURCE_FIELD,
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int unused) {
+ int pos = SOURCE_PROPERTY_POSITION.get("schema");
+ return row.getField(pos);
+ }
+ }),
+
+ SOURCE_TABLE(
+ "source.table",
+ DataTypes.STRING().nullable(),
+ SOURCE_FIELD,
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int unused) {
+ int pos = SOURCE_PROPERTY_POSITION.get("table");
+ return row.getField(pos);
+ }
+ }),
+
+ SOURCE_PROPERTIES(
+ "source.properties",
+ // key and value of the map are nullable to make handling easier in queries
+ DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable())
+ .nullable(),
+ SOURCE_FIELD,
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int unused) {
+ Map result = new HashMap<>();
+ for (int i = 0; i < SOURCE_PROPERTY_FIELDS.length; i++) {
+ Object value = row.getField(i);
+ result.put(
+ StringData.fromString(SOURCE_PROPERTY_FIELDS[i].getName()),
+ value == null ? null : StringData.fromString(value.toString()));
+ }
+ return new GenericMapData(result);
+ }
+ });
+
+ final String key;
+ final DataType dataType;
+ final DataTypes.Field requiredAvroField;
+ final MetadataConverter converter;
+
+ ReadableMetadata(
+ String key,
+ DataType dataType,
+ DataTypes.Field requiredAvroField,
+ MetadataConverter converter) {
+ this.key = key;
+ this.dataType = dataType;
+ this.requiredAvroField = requiredAvroField;
+ this.converter = converter;
+ }
+ }
+
+ private static final DataTypes.Field[] SOURCE_PROPERTY_FIELDS = {
+ DataTypes.FIELD("version", DataTypes.STRING()),
+ DataTypes.FIELD("connector", DataTypes.STRING()),
+ DataTypes.FIELD("name", DataTypes.STRING()),
+ DataTypes.FIELD("ts_ms", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable()),
+ DataTypes.FIELD("snapshot", DataTypes.STRING().nullable()),
+ DataTypes.FIELD("db", DataTypes.STRING()),
+ DataTypes.FIELD("sequence", DataTypes.STRING().nullable()),
+ DataTypes.FIELD("schema", DataTypes.STRING().nullable()),
+ DataTypes.FIELD("table", DataTypes.STRING()),
+ DataTypes.FIELD("txId", DataTypes.STRING().nullable()),
+ DataTypes.FIELD("scn", DataTypes.STRING().nullable()),
+ DataTypes.FIELD("commit_scn", DataTypes.STRING().nullable()),
+ DataTypes.FIELD("lcr_position", DataTypes.STRING().nullable())
+ };
+
+ private static final Map SOURCE_PROPERTY_POSITION =
+ IntStream.range(0, SOURCE_PROPERTY_FIELDS.length)
+ .boxed()
+ .collect(Collectors.toMap(i -> SOURCE_PROPERTY_FIELDS[i].getName(), i -> i));
+
+ private static final DataTypes.Field SOURCE_FIELD =
+ DataTypes.FIELD("source", DataTypes.ROW(SOURCE_PROPERTY_FIELDS));
+}
diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDeserializationSchema.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDeserializationSchema.java
index 2d22f4ea1b40f..fe51ba7493ceb 100644
--- a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDeserializationSchema.java
+++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDeserializationSchema.java
@@ -25,12 +25,14 @@
import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema;
import org.apache.flink.formats.avro.AvroToRowDataConverters;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
+import org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDecodingFormat.ReadableMetadata;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
@@ -40,12 +42,14 @@
import javax.annotation.Nullable;
import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
import static java.lang.String.format;
import static org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroFormatFactory.validateSchemaString;
-import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
/**
* Deserialization schema from Debezium Avro to Flink Table/SQL internal data structure {@link
@@ -83,14 +87,22 @@ public final class DebeziumAvroDeserializationSchema implements DeserializationS
/** TypeInformation of the produced {@link RowData}. */
private final TypeInformation producedTypeInfo;
+ /** Flag that indicates that an additional projection is required for metadata. */
+ private final boolean hasMetadata;
+
+ /** Metadata to be extracted for every record. */
+ private final MetadataConverter[] metadataConverters;
+
public DebeziumAvroDeserializationSchema(
- RowType rowType,
+ DataType physicalDataType,
+ List requestedMetadata,
TypeInformation producedTypeInfo,
String schemaRegistryUrl,
@Nullable String schemaString,
@Nullable Map registryConfigs) {
this.producedTypeInfo = producedTypeInfo;
- RowType debeziumAvroRowType = createDebeziumAvroRowType(fromLogicalToDataType(rowType));
+ RowType debeziumAvroRowType =
+ createDebeziumAvroRowType(physicalDataType, requestedMetadata);
validateSchemaString(schemaString, debeziumAvroRowType);
Schema schema =
@@ -104,6 +116,28 @@ public DebeziumAvroDeserializationSchema(
schema, schemaRegistryUrl, registryConfigs),
AvroToRowDataConverters.createRowConverter(debeziumAvroRowType),
producedTypeInfo);
+
+ this.hasMetadata = requestedMetadata.size() > 0;
+ this.metadataConverters =
+ requestedMetadata.stream()
+ .map(
+ m -> {
+ final int rootPosition =
+ debeziumAvroRowType
+ .getFieldNames()
+ .indexOf(m.requiredAvroField.getName());
+ return (MetadataConverter)
+ (row, pos) -> {
+ Object result = row.getField(rootPosition);
+ if (result instanceof GenericRowData) {
+ result =
+ m.converter.convert(
+ (GenericRowData) result, pos);
+ }
+ return result;
+ };
+ })
+ .toArray(MetadataConverter[]::new);
}
@VisibleForTesting
@@ -112,6 +146,8 @@ public DebeziumAvroDeserializationSchema(
AvroRowDataDeserializationSchema avroDeserializer) {
this.producedTypeInfo = producedTypeInfo;
this.avroDeserializer = avroDeserializer;
+ this.hasMetadata = false;
+ this.metadataConverters = new MetadataConverter[0];
}
@Override
@@ -140,7 +176,7 @@ public void deserialize(byte[] message, Collector out) throws IOExcepti
String op = row.getField(2).toString();
if (OP_CREATE.equals(op) || OP_READ.equals(op)) {
after.setRowKind(RowKind.INSERT);
- out.collect(after);
+ emitRow(row, after, out);
} else if (OP_UPDATE.equals(op)) {
if (before == null) {
throw new IllegalStateException(
@@ -148,15 +184,15 @@ public void deserialize(byte[] message, Collector out) throws IOExcepti
}
before.setRowKind(RowKind.UPDATE_BEFORE);
after.setRowKind(RowKind.UPDATE_AFTER);
- out.collect(before);
- out.collect(after);
+ emitRow(row, before, out);
+ emitRow(row, after, out);
} else if (OP_DELETE.equals(op)) {
if (before == null) {
throw new IllegalStateException(
String.format(REPLICA_IDENTITY_EXCEPTION, "DELETE"));
}
before.setRowKind(RowKind.DELETE);
- out.collect(before);
+ emitRow(row, before, out);
} else {
throw new IOException(
format(
@@ -169,6 +205,33 @@ public void deserialize(byte[] message, Collector out) throws IOExcepti
}
}
+ private void emitRow(
+ GenericRowData rootRow, GenericRowData physicalRow, Collector out) {
+ // shortcut in case no output projection is required
+ if (!hasMetadata) {
+ out.collect(physicalRow);
+ return;
+ }
+
+ final int physicalArity = physicalRow.getArity();
+ final int metadataArity = metadataConverters.length;
+
+ final GenericRowData producedRow =
+ new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity);
+
+ for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
+ producedRow.setField(physicalPos, physicalRow.getField(physicalPos));
+ }
+
+ for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
+ producedRow.setField(
+ physicalArity + metadataPos,
+ metadataConverters[metadataPos].convert(rootRow, metadataPos));
+ }
+
+ out.collect(producedRow);
+ }
+
@Override
public boolean isEndOfStream(RowData nextElement) {
return false;
@@ -189,22 +252,41 @@ public boolean equals(Object o) {
}
DebeziumAvroDeserializationSchema that = (DebeziumAvroDeserializationSchema) o;
return Objects.equals(avroDeserializer, that.avroDeserializer)
- && Objects.equals(producedTypeInfo, that.producedTypeInfo);
+ && Objects.equals(producedTypeInfo, that.producedTypeInfo)
+ && hasMetadata == that.hasMetadata;
}
@Override
public int hashCode() {
- return Objects.hash(avroDeserializer, producedTypeInfo);
+ return Objects.hash(avroDeserializer, producedTypeInfo, hasMetadata);
}
- public static RowType createDebeziumAvroRowType(DataType databaseSchema) {
- // Debezium Avro contains other information, e.g. "source", "ts_ms"
- // but we don't need them
- return (RowType)
+ public static RowType createDebeziumAvroRowType(
+ // Debezium Avro contains other information, e.g. "source", "ts_ms"
+ // but we don't need them
+ DataType databaseSchema, List readableMetadata) {
+ DataType payload =
DataTypes.ROW(
- DataTypes.FIELD("before", databaseSchema.nullable()),
- DataTypes.FIELD("after", databaseSchema.nullable()),
- DataTypes.FIELD("op", DataTypes.STRING()))
- .getLogicalType();
+ DataTypes.FIELD("before", databaseSchema.nullable()),
+ DataTypes.FIELD("after", databaseSchema.nullable()),
+ DataTypes.FIELD("op", DataTypes.STRING()));
+
+ // append fields that are required for reading metadata in the payload
+ final List payloadMetadataFields =
+ readableMetadata.stream()
+ .map(m -> m.requiredAvroField)
+ .distinct()
+ .collect(Collectors.toList());
+ payload = DataTypeUtils.appendRowFields(payload, payloadMetadataFields);
+
+ return (RowType) payload.getLogicalType();
+ }
+
+ /**
+ * Converter that extracts a metadata field from the row payload that comes out of the Avro
+ * schema and converts it to the desired data type.
+ */
+ interface MetadataConverter extends Serializable {
+ Object convert(GenericRowData row, int pos);
}
}
diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactory.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactory.java
index 5925f23f76325..08fece5a7e18d 100644
--- a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactory.java
+++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactory.java
@@ -21,18 +21,14 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
-import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
@@ -84,34 +80,7 @@ public DecodingFormat> createDecodingFormat(
String schema = formatOptions.getOptional(SCHEMA).orElse(null);
Map optionalPropertiesMap = buildOptionalPropertiesMap(formatOptions);
- return new ProjectableDecodingFormat>() {
- @Override
- public DeserializationSchema createRuntimeDecoder(
- DynamicTableSource.Context context,
- DataType producedDataType,
- int[][] projections) {
- producedDataType = Projection.of(projections).project(producedDataType);
- final RowType rowType = (RowType) producedDataType.getLogicalType();
- final TypeInformation producedTypeInfo =
- context.createTypeInformation(producedDataType);
- return new DebeziumAvroDeserializationSchema(
- rowType,
- producedTypeInfo,
- schemaRegistryURL,
- schema,
- optionalPropertiesMap);
- }
-
- @Override
- public ChangelogMode getChangelogMode() {
- return ChangelogMode.newBuilder()
- .addContainedKind(RowKind.INSERT)
- .addContainedKind(RowKind.UPDATE_BEFORE)
- .addContainedKind(RowKind.UPDATE_AFTER)
- .addContainedKind(RowKind.DELETE)
- .build();
- }
- };
+ return new DebeziumAvroDecodingFormat(schemaRegistryURL, schema, optionalPropertiesMap);
}
@Override
diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactoryTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactoryTest.java
index e7a263ab42123..cab6ffde11c99 100644
--- a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactoryTest.java
+++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactoryTest.java
@@ -36,6 +36,7 @@
import javax.annotation.Nonnull;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -129,19 +130,20 @@ void testSeDeSchema() {
DebeziumAvroDeserializationSchema expectedDeser =
new DebeziumAvroDeserializationSchema(
- ROW_TYPE,
+ SCHEMA.toPhysicalRowDataType(),
+ Collections.emptyList(),
InternalTypeInfo.of(ROW_TYPE),
REGISTRY_URL,
null,
registryConfigs);
DeserializationSchema actualDeser = createDeserializationSchema(options);
- assertEquals(expectedDeser, actualDeser);
+ assertThat(actualDeser).isEqualTo(expectedDeser);
DebeziumAvroSerializationSchema expectedSer =
new DebeziumAvroSerializationSchema(
ROW_TYPE, REGISTRY_URL, SUBJECT, null, registryConfigs);
SerializationSchema actualSer = createSerializationSchema(options);
- assertEquals(expectedSer, actualSer);
+ assertThat(actualSer).isEqualTo(expectedSer);
}
@Test
@@ -153,19 +155,20 @@ public void testSeDeSchemaWithSchemaOption() {
DebeziumAvroDeserializationSchema expectedDeser =
new DebeziumAvroDeserializationSchema(
- ROW_TYPE,
+ SCHEMA.toPhysicalRowDataType(),
+ Collections.emptyList(),
InternalTypeInfo.of(ROW_TYPE),
REGISTRY_URL,
AVRO_SCHEMA,
registryConfigs);
DeserializationSchema actualDeser = createDeserializationSchema(options);
- assertThat(actualDeser).isEqualTo(expectedDeser);
+ assertEquals(expectedDeser, actualDeser);
DebeziumAvroSerializationSchema expectedSer =
new DebeziumAvroSerializationSchema(
ROW_TYPE, REGISTRY_URL, SUBJECT, AVRO_SCHEMA, registryConfigs);
SerializationSchema actualSer = createSerializationSchema(options);
- assertThat(actualSer).isEqualTo(expectedSer);
+ assertEquals(expectedSer, actualSer);
}
@Test
diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroSerDeSchemaTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroSerDeSchemaTest.java
index 2640b50e44c59..276a02ddc4ae7 100644
--- a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroSerDeSchemaTest.java
+++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroSerDeSchemaTest.java
@@ -27,14 +27,18 @@
import org.apache.flink.formats.avro.RegistryAvroSerializationSchema;
import org.apache.flink.formats.avro.RowDataToAvroConverters;
import org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder;
+import org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDecodingFormat.ReadableMetadata;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.util.Collector;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.SimpleUserCodeClassLoader;
@@ -54,6 +58,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.apache.flink.table.api.DataTypes.BIGINT;
@@ -88,7 +93,7 @@ void testSerializationDeserialization() throws Exception {
RowType rowTypeDe =
DebeziumAvroDeserializationSchema.createDebeziumAvroRowType(
- fromLogicalToDataType(rowType));
+ fromLogicalToDataType(rowType), Collections.emptyList());
RowType rowTypeSe =
DebeziumAvroSerializationSchema.createDebeziumAvroRowType(
fromLogicalToDataType(rowType));
@@ -146,10 +151,42 @@ void testDeleteDataDeserialization() throws Exception {
assertThat(actual).isEqualTo(expected);
}
+ @Test
+ void testDeserializationWithMetadata() throws Exception {
+ testDeserializationWithMetadata(
+ "debezium-avro-insert.avro",
+ row -> {
+ // Physical columns
+ assertThat(row.getLong(0)).isEqualTo(1L);
+ assertThat(row.getString(1).toString()).isEqualTo("lisi");
+ assertThat(row.getString(2).toString()).isEqualTo("test debezium avro data");
+ assertThat(row.getDouble(3)).isEqualTo(21.799999237060547);
+
+ // Metadata: ingestion-timestamp (field index 4)
+ assertThat(row.getTimestamp(4, 3)).isNotNull();
+
+ // Metadata: source.timestamp (field index 5)
+ assertThat(row.getTimestamp(5, 3)).isNotNull();
+
+ // Metadata: source.database (field index 6)
+ assertThat(row.getString(6).toString()).isEqualTo("test1");
+
+ // Metadata: source.schema (field index 7) - may be null for MySQL
+ // MySQL doesn't have schema field in source
+
+ // Metadata: source.table (field index 8)
+ assertThat(row.getString(8).toString()).isEqualTo("person");
+
+ // Metadata: source.properties (field index 9)
+ assertThat(row.getMap(9)).isNotNull();
+ assertThat(row.getMap(9).size()).isGreaterThan(0);
+ });
+ }
+
public List testDeserialization(String dataPath) throws Exception {
RowType rowTypeDe =
DebeziumAvroDeserializationSchema.createDebeziumAvroRowType(
- fromLogicalToDataType(rowType));
+ fromLogicalToDataType(rowType), Collections.emptyList());
client.register(SUBJECT, DEBEZIUM_SCHEMA_COMPATIBLE_TEST, 1, 81);
@@ -164,6 +201,36 @@ public List testDeserialization(String dataPath) throws Exception {
return collector.list.stream().map(Object::toString).collect(Collectors.toList());
}
+ private void testDeserializationWithMetadata(String dataPath, Consumer testConsumer)
+ throws Exception {
+ final List requestedMetadata = Arrays.asList(ReadableMetadata.values());
+
+ final DataType producedDataType =
+ DataTypeUtils.appendRowFields(
+ fromLogicalToDataType(rowType),
+ requestedMetadata.stream()
+ .map(m -> DataTypes.FIELD(m.key, m.dataType))
+ .collect(Collectors.toList()));
+
+ RowType rowTypeDe =
+ DebeziumAvroDeserializationSchema.createDebeziumAvroRowType(
+ fromLogicalToDataType(rowType), requestedMetadata);
+
+ client.register(SUBJECT, DEBEZIUM_SCHEMA_COMPATIBLE_TEST, 1, 81);
+
+ DebeziumAvroDeserializationSchema dbzDeserializer =
+ new DebeziumAvroDeserializationSchema(
+ InternalTypeInfo.of(producedDataType.getLogicalType()),
+ getDeserializationSchema(rowTypeDe));
+ dbzDeserializer.open(new MockInitializationContext());
+
+ SimpleCollector collector = new SimpleCollector();
+ dbzDeserializer.deserialize(readBytesFromFile(dataPath), collector);
+
+ assertThat(collector.list).hasSize(1);
+ assertThat(collector.list.get(0)).satisfies(testConsumer);
+ }
+
private AvroRowDataDeserializationSchema getDeserializationSchema(RowType rowType) {
final ConfluentSchemaRegistryCoder registryCoder =