diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java index c946b9e29ce..2bfee92ee8e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java @@ -92,7 +92,8 @@ public MySqlPipelineRecordEmitter( super( debeziumDeserializationSchema, sourceReaderMetrics, - sourceConfig.isIncludeSchemaChanges()); + sourceConfig.isIncludeSchemaChanges(), + false); // Explicitly disable transaction metadata events this.debeziumDeserializationSchema = debeziumDeserializationSchema; this.sourceConfig = sourceConfig; this.alreadySendCreateTableTables = new HashSet<>(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java index 9cd40f01530..61045726a78 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java @@ -132,7 +132,8 @@ public static MySqlSourceBuilder builder() { new MySqlRecordEmitter<>( deserializationSchema, sourceReaderMetrics, - sourceConfig.isIncludeSchemaChanges())); + sourceConfig.isIncludeSchemaChanges(), + sourceConfig.isIncludeTransactionMetadataEvents())); } MySqlSource( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java index caf316d1b4a..fbda06bccd9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java @@ -198,6 +198,12 @@ public MySqlSourceBuilder includeSchemaChanges(boolean includeSchemaChanges) return this; } + /** Whether the {@link MySqlSource} should output the transaction metadata events or not. */ + public MySqlSourceBuilder includeTransactionMetadataEvents(boolean includeTransactionMetadataEvents) { + this.configFactory.includeTransactionMetadataEvents(includeTransactionMetadataEvents); + return this; + } + /** Whether the {@link MySqlSource} should scan the newly added tables or not. */ public MySqlSourceBuilder scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) { this.configFactory.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index 260a7cd2b5d..c0adf7eb214 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -62,6 +62,7 @@ public class MySqlSourceConfig implements Serializable { private final double distributionFactorUpper; private final double distributionFactorLower; private final boolean includeSchemaChanges; + private final boolean includeTransactionMetadataEvents; private final boolean scanNewlyAddedTableEnabled; private final boolean closeIdleReaders; private final Properties jdbcProperties; @@ -99,6 +100,7 @@ public class MySqlSourceConfig implements Serializable { double distributionFactorUpper, double distributionFactorLower, boolean includeSchemaChanges, + boolean includeTransactionMetadataEvents, boolean scanNewlyAddedTableEnabled, boolean closeIdleReaders, Properties dbzProperties, @@ -128,6 +130,7 @@ public class MySqlSourceConfig implements Serializable { this.distributionFactorUpper = distributionFactorUpper; this.distributionFactorLower = distributionFactorLower; this.includeSchemaChanges = includeSchemaChanges; + this.includeTransactionMetadataEvents = includeTransactionMetadataEvents; this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled; this.closeIdleReaders = closeIdleReaders; this.dbzProperties = checkNotNull(dbzProperties); @@ -227,6 +230,10 @@ public boolean isIncludeSchemaChanges() { return includeSchemaChanges; } + public boolean isIncludeTransactionMetadataEvents() { + return includeTransactionMetadataEvents; + } + public boolean isScanNewlyAddedTableEnabled() { return scanNewlyAddedTableEnabled; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 427115edea7..8715c8b6f61 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -63,6 +63,7 @@ public class MySqlSourceConfigFactory implements Serializable { private double distributionFactorLower = MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(); private boolean includeSchemaChanges = false; + private boolean includeTransactionMetadataEvents = false; private boolean scanNewlyAddedTableEnabled = false; private boolean closeIdleReaders = false; private Properties jdbcProperties; @@ -235,6 +236,12 @@ public MySqlSourceConfigFactory includeSchemaChanges(boolean includeSchemaChange return this; } + /** Whether the {@link MySqlSource} should output the transaction metadata events or not. */ + public MySqlSourceConfigFactory includeTransactionMetadataEvents(boolean includeTransactionMetadataEvents) { + this.includeTransactionMetadataEvents = includeTransactionMetadataEvents; + return this; + } + /** Whether the {@link MySqlSource} should scan the newly added tables or not. */ public MySqlSourceConfigFactory scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) { this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled; @@ -359,6 +366,8 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { // Note: the includeSchemaChanges parameter is used to control emitting the schema record, // only DataStream API program need to emit the schema record, the Table API need not props.setProperty("include.schema.changes", String.valueOf(true)); + // enable transaction metadata if includeTransactionMetadataEvents is true + props.setProperty("provide.transaction.metadata", String.valueOf(includeTransactionMetadataEvents)); // disable the offset flush totally props.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE)); // disable tombstones @@ -412,6 +421,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { distributionFactorUpper, distributionFactorLower, includeSchemaChanges, + includeTransactionMetadataEvents, scanNewlyAddedTableEnabled, closeIdleReaders, props, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java index 449e7f608f2..02d35380903 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java @@ -53,15 +53,18 @@ public class MySqlRecordEmitter implements RecordEmitter debeziumDeserializationSchema; private final MySqlSourceReaderMetrics sourceReaderMetrics; private final boolean includeSchemaChanges; + private final boolean includeTransactionMetadataEvents; private final OutputCollector outputCollector; public MySqlRecordEmitter( DebeziumDeserializationSchema debeziumDeserializationSchema, MySqlSourceReaderMetrics sourceReaderMetrics, - boolean includeSchemaChanges) { + boolean includeSchemaChanges, + boolean includeTransactionMetadataEvents) { this.debeziumDeserializationSchema = debeziumDeserializationSchema; this.sourceReaderMetrics = sourceReaderMetrics; this.includeSchemaChanges = includeSchemaChanges; + this.includeTransactionMetadataEvents = includeTransactionMetadataEvents; this.outputCollector = new OutputCollector<>(); } @@ -102,6 +105,11 @@ protected void processElement( emitElement(element, output); } else if (RecordUtils.isHeartbeatEvent(element)) { updateStartingOffsetForSplit(splitState, element); + } else if (RecordUtils.isTransactionMetadataEvent(element)) { + updateStartingOffsetForSplit(splitState, element); + if (includeTransactionMetadataEvents) { + emitElement(element, output); + } } else { // unknown element LOG.info("Meet unknown element {}, just skip.", element); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java index 8ae91ad9dad..6bdad7f3c5c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java @@ -73,6 +73,8 @@ private RecordUtils() {} "io.debezium.connector.mysql.SchemaChangeKey"; public static final String SCHEMA_HEARTBEAT_EVENT_KEY_NAME = "io.debezium.connector.common.Heartbeat"; + public static final String SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME = + "io.debezium.connector.common.TransactionMetadataKey"; private static final DocumentReader DOCUMENT_READER = DocumentReader.defaultReader(); /** Converts a {@link ResultSet} row to an array of Objects. */ @@ -339,6 +341,18 @@ public static boolean isHeartbeatEvent(SourceRecord record) { && SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name()); } + /** + * Check whether the given source record is a transaction metadata event (BEGIN or END). + * + *

Transaction events are emitted by Debezium to mark transaction boundaries when + * provide.transaction.metadata is enabled. + */ + public static boolean isTransactionMetadataEvent(SourceRecord record) { + Schema keySchema = record.keySchema(); + return keySchema != null + && SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name()); + } + /** * Return the finished snapshot split information. * diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java index 5553b1ba8ce..38b54c61f36 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java @@ -23,6 +23,7 @@ import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplitState; import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords; +import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils; import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; @@ -36,11 +37,17 @@ import io.debezium.relational.TableId; import io.debezium.schema.TopicSelector; import io.debezium.util.SchemaNameAdjuster; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import static io.debezium.config.CommonConnectorConfig.TRANSACTION_TOPIC; import static io.debezium.connector.mysql.MySqlConnectorConfig.SERVER_NAME; @@ -89,30 +96,370 @@ record -> { private MySqlRecordEmitter createRecordEmitter() { return new MySqlRecordEmitter<>( - new DebeziumDeserializationSchema() { - @Override - public void deserialize(SourceRecord record, Collector out) { - throw new UnsupportedOperationException(); - } + new DebeziumDeserializationSchema<>() { + @Override + public void deserialize(SourceRecord record, Collector out) { + throw new UnsupportedOperationException(); + } - @Override - public TypeInformation getProducedType() { - return TypeInformation.of(Void.class); - } - }, + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(Void.class); + } + }, new MySqlSourceReaderMetrics( UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()), + false, false); } + @Test + void testTransactionMetadataEventsDisabledByDefault() throws Exception { + SourceRecord transactionBeginEvent = createTransactionMetadataEvent("BEGIN", "tx-123", 100L); + + Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionBeginEvent)) + .isTrue(); + + AtomicInteger emittedRecordsCount = new AtomicInteger(0); + MySqlRecordEmitter recordEmitter = createRecordEmitterWithTransactionConfig(emittedRecordsCount, false); + MySqlBinlogSplitState splitState = createBinlogSplitState(); + + BinlogOffset offsetBeforeEmit = splitState.getStartingOffset(); + + TestingReaderOutput readerOutput = new TestingReaderOutput<>(); + recordEmitter.emitRecord( + SourceRecords.fromSingleRecord(transactionBeginEvent), + readerOutput, + splitState); + + // Verify the offset was updated (this should always happen) + BinlogOffset expectedOffset = RecordUtils.getBinlogPosition(transactionBeginEvent); + Assertions.assertThat(splitState.getStartingOffset()) + .isNotNull() + .isNotEqualTo(offsetBeforeEmit) + .isEqualByComparingTo(expectedOffset); + + // Verify the event was NOT emitted (because includeTransactionMetadataEvents=false) + Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(0); + Assertions.assertThat(readerOutput.getEmittedRecords()).isEmpty(); + } + + @Test + void testTransactionMetadataEventsEnabledExplicitly() throws Exception { + SourceRecord transactionBeginEvent = createTransactionMetadataEvent("BEGIN", "tx-456", 150L); + + Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionBeginEvent)) + .isTrue(); + + AtomicInteger emittedRecordsCount = new AtomicInteger(0); + MySqlRecordEmitter recordEmitter = createRecordEmitterWithTransactionConfig(emittedRecordsCount, true); + MySqlBinlogSplitState splitState = createBinlogSplitState(); + + BinlogOffset offsetBeforeEmit = splitState.getStartingOffset(); + + TestingReaderOutput readerOutput = new TestingReaderOutput<>(); + recordEmitter.emitRecord( + SourceRecords.fromSingleRecord(transactionBeginEvent), + readerOutput, + splitState); + + // Verify the offset was updated + BinlogOffset expectedOffset = RecordUtils.getBinlogPosition(transactionBeginEvent); + Assertions.assertThat(splitState.getStartingOffset()) + .isNotNull() + .isNotEqualTo(offsetBeforeEmit) + .isEqualByComparingTo(expectedOffset); + + // Verify the event was emitted (because includeTransactionMetadataEvents=true) + Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(1); + Assertions.assertThat(readerOutput.getEmittedRecords()).hasSize(1); + } + + @Test + void testTransactionBeginEventHandling() throws Exception { + SourceRecord transactionBeginEvent = createTransactionMetadataEvent("BEGIN", "tx-123", 100L); + + Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionBeginEvent)) + .isTrue(); + + AtomicInteger emittedRecordsCount = new AtomicInteger(0); + MySqlRecordEmitter recordEmitter = createRecordEmitterWithCounter(emittedRecordsCount); + MySqlBinlogSplitState splitState = createBinlogSplitState(); + + BinlogOffset offsetBeforeEmit = splitState.getStartingOffset(); + + TestingReaderOutput readerOutput = new TestingReaderOutput<>(); + recordEmitter.emitRecord( + SourceRecords.fromSingleRecord(transactionBeginEvent), + readerOutput, + splitState); + + // Verify the offset was updated + BinlogOffset expectedOffset = RecordUtils.getBinlogPosition(transactionBeginEvent); + Assertions.assertThat(splitState.getStartingOffset()) + .isNotNull() + .isNotEqualTo(offsetBeforeEmit) + .isEqualByComparingTo(expectedOffset); + + // Verify the event was emitted + Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(1); + } + + @Test + void testTransactionEndEventHandling() throws Exception { + SourceRecord transactionEndEvent = createTransactionMetadataEvent("END", "tx-123", 200L); + + Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionEndEvent)) + .isTrue(); + + AtomicInteger emittedRecordsCount = new AtomicInteger(0); + MySqlRecordEmitter recordEmitter = createRecordEmitterWithCounter(emittedRecordsCount); + MySqlBinlogSplitState splitState = createBinlogSplitState(); + + TestingReaderOutput readerOutput = new TestingReaderOutput<>(); + recordEmitter.emitRecord( + SourceRecords.fromSingleRecord(transactionEndEvent), + readerOutput, + splitState); + + // Verify the offset was updated + BinlogOffset expectedOffset = RecordUtils.getBinlogPosition(transactionEndEvent); + Assertions.assertThat(splitState.getStartingOffset()) + .isNotNull() + .isEqualByComparingTo(expectedOffset); + + // Verify the event was emitted + Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(1); + } + + + @Test + void testNonTransactionEventNotDetected() { + Schema keySchema = SchemaBuilder.struct() + .field("id", Schema.INT32_SCHEMA) + .build(); + Schema valueSchema = SchemaBuilder.struct() + .field("op", Schema.STRING_SCHEMA) + .build(); + + Struct key = new Struct(keySchema).put("id", 1); + Struct value = new Struct(valueSchema).put("op", "c"); + + Map offset = new HashMap<>(); + offset.put("file", "mysql-bin.000001"); + offset.put("pos", 100L); + + SourceRecord dataRecord = new SourceRecord( + Collections.singletonMap("server", "mysql"), + offset, + "test.table", + keySchema, + key, + valueSchema, + value); + + // Verify it's NOT detected as a transaction metadata event + Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(dataRecord)).isFalse(); + } + + @Test + void testTransactionEventWithoutKeySchemaNotDetected() { + Schema valueSchema = SchemaBuilder.struct() + .name(RecordUtils.SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME) + .field("status", Schema.STRING_SCHEMA) + .build(); + + Struct value = new Struct(valueSchema).put("status", "BEGIN"); + + Map offset = new HashMap<>(); + offset.put("file", "mysql-bin.000001"); + offset.put("pos", 100L); + + SourceRecord record = new SourceRecord( + Collections.singletonMap("server", "mysql"), + offset, + "transaction.topic", + null, // No key schema + null, + valueSchema, + value); + + // Verify it's NOT detected as a transaction metadata event + Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(record)).isFalse(); + } + + @Test + void testMultipleTransactionEventsWithDisabledConfig() throws Exception { + SourceRecord beginEvent = createTransactionMetadataEvent("BEGIN", "tx-789", 300L); + SourceRecord endEvent = createTransactionMetadataEvent("END", "tx-789", 400L); + + AtomicInteger emittedRecordsCount = new AtomicInteger(0); + MySqlRecordEmitter recordEmitter = createRecordEmitterWithTransactionConfig(emittedRecordsCount, false); + MySqlBinlogSplitState splitState = createBinlogSplitState(); + + TestingReaderOutput readerOutput = new TestingReaderOutput<>(); + + recordEmitter.emitRecord( + SourceRecords.fromSingleRecord(beginEvent), + readerOutput, + splitState); + + recordEmitter.emitRecord( + SourceRecords.fromSingleRecord(endEvent), + readerOutput, + splitState); + + // Verify offsets were updated but no events were emitted + BinlogOffset expectedOffset = RecordUtils.getBinlogPosition(endEvent); + Assertions.assertThat(splitState.getStartingOffset()) + .isNotNull() + .isEqualByComparingTo(expectedOffset); + + // Verify no events were emitted (because includeTransactionMetadataEvents=false) + Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(0); + Assertions.assertThat(readerOutput.getEmittedRecords()).isEmpty(); + } + + @Test + void testMixedEventsWithTransactionMetadataDisabled() throws Exception { + SourceRecord transactionEvent = createTransactionMetadataEvent("BEGIN", "tx-mixed", 500L); + SourceRecord dataEvent = createDataChangeEvent("test.table", 501L); + + AtomicInteger emittedRecordsCount = new AtomicInteger(0); + MySqlRecordEmitter recordEmitter = createRecordEmitterWithTransactionConfig(emittedRecordsCount, false); + MySqlBinlogSplitState splitState = createBinlogSplitState(); + + TestingReaderOutput readerOutput = new TestingReaderOutput<>(); + + recordEmitter.emitRecord( + SourceRecords.fromSingleRecord(transactionEvent), + readerOutput, + splitState); + + recordEmitter.emitRecord( + SourceRecords.fromSingleRecord(dataEvent), + readerOutput, + splitState); + + // Verify only data event was emitted (count=1, not 2) + Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(1); + Assertions.assertThat(readerOutput.getEmittedRecords()).hasSize(1); + } + private MySqlBinlogSplitState createBinlogSplitState() { return new MySqlBinlogSplitState( - new MySqlBinlogSplit( - "binlog-split", - BinlogOffset.ofEarliest(), - BinlogOffset.ofNonStopping(), - Collections.emptyList(), - Collections.emptyMap(), - 0)); + new MySqlBinlogSplit( + "binlog-split", + BinlogOffset.ofEarliest(), + BinlogOffset.ofNonStopping(), + Collections.emptyList(), + Collections.emptyMap(), + 0)); + } + + /** + * Helper method to create a MySqlRecordEmitter that counts emitted records. + */ + private MySqlRecordEmitter createRecordEmitterWithCounter(AtomicInteger counter) { + return createRecordEmitterWithTransactionConfig(counter, true); + } + + /** + * Helper method to create a MySqlRecordEmitter with configurable transaction metadata events. + */ + private MySqlRecordEmitter createRecordEmitterWithTransactionConfig(AtomicInteger counter, boolean includeTransactionMetadataEvents) { + return new MySqlRecordEmitter<>( + new DebeziumDeserializationSchema<>() { + @Override + public void deserialize(SourceRecord record, Collector out) { + counter.incrementAndGet(); + out.collect("transaction-event"); + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(String.class); + } + }, + new MySqlSourceReaderMetrics( + UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()), + false, + includeTransactionMetadataEvents); + } + + private SourceRecord createTransactionMetadataEvent( + String status, String transactionId, long position) { + Schema keySchema = SchemaBuilder.struct() + .name(RecordUtils.SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME) + .field("id", Schema.STRING_SCHEMA) + .build(); + + Schema valueSchema = SchemaBuilder.struct() + .name("io.debezium.connector.common.TransactionMetadataValue") + .field("status", Schema.STRING_SCHEMA) + .field("id", Schema.STRING_SCHEMA) + .field("event_count", Schema.OPTIONAL_INT64_SCHEMA) + .field("ts_ms", Schema.INT64_SCHEMA) + .build(); + + Struct key = new Struct(keySchema).put("id", transactionId); + + Struct value = new Struct(valueSchema) + .put("status", status) + .put("id", transactionId) + .put("ts_ms", System.currentTimeMillis()); + + if ("END".equals(status)) { + value.put("event_count", 5L); + } + + Map offset = new HashMap<>(); + offset.put("file", "mysql-bin.000001"); + offset.put("pos", position); + offset.put("transaction_id", transactionId); + + return new SourceRecord( + Collections.singletonMap("server", "mysql_binlog_source"), + offset, + "mysql_binlog_source.transaction", + keySchema, + key, + valueSchema, + value); } + + private SourceRecord createDataChangeEvent(String topicName, long position) { + Schema keySchema = SchemaBuilder.struct() + .field("id", Schema.INT32_SCHEMA) + .build(); + Schema valueSchema = SchemaBuilder.struct() + .field("op", Schema.STRING_SCHEMA) + .field("after", SchemaBuilder.struct() + .field("id", Schema.INT32_SCHEMA) + .field("name", Schema.STRING_SCHEMA) + .optional()) + .build(); + + Struct key = new Struct(keySchema).put("id", 1); + Struct after = new Struct(valueSchema.field("after").schema()) + .put("id", 1) + .put("name", "test"); + Struct value = new Struct(valueSchema) + .put("op", "c") + .put("after", after); + + Map offset = new HashMap<>(); + offset.put("file", "mysql-bin.000001"); + offset.put("pos", position); + + return new SourceRecord( + Collections.singletonMap("server", "mysql"), + offset, + topicName, + keySchema, + key, + valueSchema, + value); + } + } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java index 7b8dfdcdbb2..509e45872a0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java @@ -573,7 +573,8 @@ private MySqlSourceReader createReader( : new MySqlRecordEmitter<>( new ForwardDeserializeSchema(), new MySqlSourceReaderMetrics(readerContext.metricGroup()), - configuration.isIncludeSchemaChanges()); + configuration.isIncludeSchemaChanges(), + configuration.isIncludeTransactionMetadataEvents()); final MySqlSourceReaderContext mySqlSourceReaderContext = new MySqlSourceReaderContext(readerContext); return new MySqlSourceReader<>( @@ -740,7 +741,7 @@ public MysqlLimitedRecordEmitter( MySqlSourceReaderMetrics sourceReaderMetrics, boolean includeSchemaChanges, int limit) { - super(debeziumDeserializationSchema, sourceReaderMetrics, includeSchemaChanges); + super(debeziumDeserializationSchema, sourceReaderMetrics, includeSchemaChanges, false); this.debeziumDeserializationSchema = debeziumDeserializationSchema; this.sourceReaderMetrics = sourceReaderMetrics; this.includeSchemaChanges = includeSchemaChanges;