-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
The ContainerProperties#setTransactionManager method was deprecated in favor of KafkaAwareTransactionManager.
However, the transactionManager plays an important role when implementing Idempotent Receiver patterns that rely on storing processed message IDs in a SQL database to ensure consistency and prevent duplicate processing.
See Spring Kafka documentation: Filtering Messages
Implementation
A common approach to building an idempotent receiver involves coupling message consumption and database operations within the same local transaction:
- Configure a JDBC
TransactionManagerusingContainerProperties#setTransactionManager.
→ The container opens a database transaction for each processed record. - Implement a
RecordFilterStrategythat checks if the incoming message ID already exists in the database; if yes, the record is filtered. - Since both
@KafkaListenerandRecordFilterStrategyexecute in the same transaction, it guarantees atomic writes of business data and idempotent consumer records.
Problem
Removing support for transactionManager eliminates the ability to perform this pattern correctly using database transactions.
Recommendation
Retain support for transactionManager for database transaction usage while aligning Kafka transaction handling with KafkaAwareTransactionManager.
A potential implementation approach could look like this:
if (kafkaAwareTransactionManager != null) {
invokeRecordListenerInKafkaTx(records); // use AfterRollbackProcessor
} else if (jdbcTransactionManager != null) {
invokeRecordListenerInJdbcTx(records); // use CommonErrorHandler
} else {
doInvokeWithRecords(records); // use CommonErrorHandler
}
This allows KafkaAwareTransactionManager for Kafka and DB transactions and transactionManager for local database transactions, preserving the ability to implement Idempotent Receivers safely and consistently.