Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -338,4 +338,14 @@
* @since 3.1
*/
String containerPostProcessor() default "";

/**
* Override the container factory's default {@code ackMode} for this listener.
* <p>
* Supports SpEL {@code #{...}} and property placeholders {@code ${...}}.
* @return the ack mode (case-insensitive), or empty string to use factory default.
* @since 4.1
* @see org.springframework.kafka.listener.ContainerProperties.AckMode
*/
String ackMode() default "";
}
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ private void processKafkaListenerAnnotation(MethodKafkaListenerEndpoint<?, ?> en
resolveErrorHandler(endpoint, kafkaListener);
resolveContentTypeConverter(endpoint, kafkaListener);
resolveFilter(endpoint, kafkaListener);
resolveAckMode(endpoint, kafkaListener);
resolveContainerPostProcessor(endpoint, kafkaListener);
}

Expand Down Expand Up @@ -740,6 +741,16 @@ private void resolveFilter(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaList
}
}

private void resolveAckMode(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener) {
String ackMode = kafkaListener.ackMode();
if (StringUtils.hasText(ackMode)) {
String ackModeValue = resolveExpressionAsString(ackMode, "ackMode");
if (StringUtils.hasText(ackModeValue)) {
endpoint.setAckMode(ackModeValue);
}
}
}

private @Nullable KafkaListenerContainerFactory<?> resolveContainerFactory(KafkaListener kafkaListener,
@Nullable Object factoryTarget, String beanName) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,12 @@ else if (this.autoStartup != null) {
.acceptIfNotNull(endpoint.getConsumerProperties(),
instance.getContainerProperties()::setKafkaConsumerProperties)
.acceptIfNotNull(endpoint.getListenerInfo(), instance::setListenerInfo);

// Set ackMode if specified in endpoint (overrides factory default)
String ackMode = endpoint.getAckMode();
if (ackMode != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The acceptIfNotNull() API is good to go instead of this if:

.acceptIfNotNull(endpoint.getAckMode(), (ackMode) ->  properties.setAckMode(ContainerProperties.AckMode.valueOf(ackMode.toUpperCase())))

properties.setAckMode(ContainerProperties.AckMode.valueOf(ackMode.toUpperCase()));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
@Nullable
private String mainListenerId;

private @Nullable String ackMode;

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
Expand Down Expand Up @@ -387,6 +389,25 @@ public void setAutoStartup(@Nullable Boolean autoStartup) {
this.autoStartup = autoStartup;
}

/**
* Return the ackMode for this endpoint's container.
* @return the ackMode.
* @since 4.1
*/
public @Nullable String getAckMode() {
return this.ackMode;
}

/**
* Set the ackMode for this endpoint's container to override the factory's default.
* @param ackMode the ackMode string (case-insensitive).
* @since 4.1
* @see org.springframework.kafka.listener.ContainerProperties.AckMode
*/
public void setAckMode(@Nullable String ackMode) {
this.ackMode = ackMode;
}

/**
* Set a configurer which will be invoked when creating a reply message.
* @param replyHeadersConfigurer the configurer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,15 @@ default Boolean getBatchListener() {
return null;
}

/**
* Return the ackMode for this endpoint, or null if not explicitly set.
* @return the ack mode string.
* @since 4.1
* @see org.springframework.kafka.listener.ContainerProperties.AckMode
*/
@Nullable
default String getAckMode() {
return null;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright 2025-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.annotation;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import static org.assertj.core.api.Assertions.assertThat;

/**
* Tests for {@link KafkaListener} ackMode attribute.
*
* @author GO BEOMJUN
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think legal names go fully in upper case.

* @since 4.1
*/
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(topics = {"ackModeRecord", "ackModeManual", "ackModeDefault"}, partitions = 1)
public class KafkaListenerAckModeTests {

@Autowired
private KafkaTemplate<Integer, String> template;

@Autowired
private Config config;

@Autowired
private KafkaListenerEndpointRegistry registry;

@Test
void testAckModeRecordOverride() throws Exception {
this.template.send("ackModeRecord", "test-record");
assertThat(this.config.recordLatch.await(10, TimeUnit.SECONDS)).isTrue();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to test an interaction with a consumer to just verify AckMode propagation.
That is just a configuration phase feature.
Therefore we might fully don't need an @EmbeddedKafka for this test suite.
Feels like just a mock for ConsumerFactory is enough.
And we won't need producer part at all here.


// Verify that the listener container has the correct ack mode
MessageListenerContainer container = this.registry.getListenerContainer("ackModeRecordListener");
assertThat(container).isNotNull();
assertThat(container.getContainerProperties().getAckMode())
.isEqualTo(ContainerProperties.AckMode.RECORD);
}

@Test
void testAckModeManualOverride() throws Exception {
this.template.send("ackModeManual", "test-manual");
assertThat(this.config.manualLatch.await(10, TimeUnit.SECONDS)).isTrue();

// Verify that the listener container has the correct ack mode
MessageListenerContainer container = this.registry.getListenerContainer("ackModeManualListener");
assertThat(container).isNotNull();
assertThat(container.getContainerProperties().getAckMode())
.isEqualTo(ContainerProperties.AckMode.MANUAL);
}

@Test
void testAckModeDefault() throws Exception {
this.template.send("ackModeDefault", "test-default");
assertThat(this.config.defaultLatch.await(10, TimeUnit.SECONDS)).isTrue();

// Verify that the listener container uses factory default (BATCH)
MessageListenerContainer container = this.registry.getListenerContainer("ackModeDefaultListener");
assertThat(container).isNotNull();
assertThat(container.getContainerProperties().getAckMode())
.isEqualTo(ContainerProperties.AckMode.BATCH);
}

@Configuration
@EnableKafka
public static class Config {

final CountDownLatch recordLatch = new CountDownLatch(1);

final CountDownLatch manualLatch = new CountDownLatch(1);

final CountDownLatch defaultLatch = new CountDownLatch(1);

@Bean
public ConsumerFactory<Integer, String> consumerFactory(EmbeddedKafkaBroker broker) {
Map<String, Object> consumerProps = new HashMap<>(KafkaTestUtils.consumerProps(broker, "testGroup", false));
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(consumerProps);
}

@Bean
public ProducerFactory<Integer, String> producerFactory(EmbeddedKafkaBroker broker) {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(broker));
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(
ConsumerFactory<Integer, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// Set factory default to BATCH
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
return factory;
}

@KafkaListener(id = "ackModeRecordListener", topics = "ackModeRecord", ackMode = "RECORD")
public void listenWithRecordAck(String message) {
this.recordLatch.countDown();
}

@KafkaListener(id = "ackModeManualListener", topics = "ackModeManual", ackMode = "MANUAL")
public void listenWithManualAck(String message, Acknowledgment ack) {
ack.acknowledge();
this.manualLatch.countDown();
}

@KafkaListener(id = "ackModeDefaultListener", topics = "ackModeDefault")
public void listenWithDefaultAck(String message) {
this.defaultLatch.countDown();
}

}

}