This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 3.2.4! |
@KafkaListener
Annotation
The @KafkaListener
annotation is used to designate a bean method as a listener for a listener container.
The bean is wrapped in a MessagingMessageListenerAdapter
configured with various features, such as converters to convert the data, if necessary, to match the method parameters.
You can configure most attributes on the annotation with SpEL by using #{…}
or property placeholders (${…}
).
See the Javadoc for more information.
Record Listeners
The @KafkaListener
annotation provides a mechanism for simple POJO listeners.
The following example shows how to use it:
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
This mechanism requires an @EnableKafka
annotation on one of your @Configuration
classes and a listener container factory, which is used to configure the underlying ConcurrentMessageListenerContainer
.
By default, a bean with name kafkaListenerContainerFactory
is expected.
The following example shows how to use ConcurrentMessageListenerContainer
:
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
...
return props;
}
}
Notice that, to set container properties, you must use the getContainerProperties()
method on the factory.
It is used as a template for the actual properties injected into the container.
Starting with version 2.1.1, you can now set the client.id
property for consumers created by the annotation.
The clientIdPrefix
is suffixed with -n
, where n
is an integer representing the container number when using concurrency.
Starting with version 2.2, you can now override the container factory’s concurrency
and autoStartup
properties by using properties on the annotation itself.
The properties can be simple values, property placeholders, or SpEL expressions.
The following example shows how to do so:
@KafkaListener(id = "myListener", topics = "myTopic",
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
...
}
Explicit Partition Assignment
You can also configure POJO listeners with explicit topics and partitions (and, optionally, their initial offsets). The following example shows how to do so:
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
You can specify each partition in the partitions
or partitionOffsets
attribute but not both.
As with most annotation properties, you can use SpEL expressions; for an example of how to generate a large list of partitions, see Manually Assigning All Partitions.
Starting with version 2.5.5, you can apply an initial offset to all assigned partitions:
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" },
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
The *
wildcard represents all partitions in the partitions
attribute.
There must only be one @PartitionOffset
with the wildcard in each @TopicPartition
.
In addition, when the listener implements ConsumerSeekAware
, onPartitionsAssigned
is now called, even when using manual assignment.
This allows, for example, any arbitrary seek operations at that time.
Starting with version 2.6.4, you can specify a comma-delimited list of partitions, or partition ranges:
@KafkaListener(id = "pp", autoStartup = "false",
topicPartitions = @TopicPartition(topic = "topic1",
partitions = "0-5, 7, 10-15"))
public void process(String in) {
...
}
The range is inclusive; the example above will assign partitions 0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15
.
The same technique can be used when specifying initial offsets:
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1",
partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
The initial offset will be applied to all 6 partitions.
Since 3.2, @PartitionOffset
support SeekPosition.END
, SeekPosition.BEGINNING
, SeekPosition.TIMESTAMP
, seekPosition
match SeekPosition
enum name:
@KafkaListener(id = "seekPositionTime", topicPartitions = {
@TopicPartition(topic = TOPIC_SEEK_POSITION, partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "723916800000", seekPosition = "TIMESTAMP"),
@PartitionOffset(partition = "1", initialOffset = "0", seekPosition = "BEGINNING"),
@PartitionOffset(partition = "2", initialOffset = "0", seekPosition = "END")
})
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
If seekPosition set END
or BEGINNING
will ignore initialOffset
and relativeToCurrent
.
If seekPosition set TIMESTAMP
, initialOffset
means timestamp.
Manual Acknowledgment
When using manual AckMode
, you can also provide the listener with the Acknowledgment
.
To activate the manual AckMode
, you need to set the ack-mode in ContainerProperties
to the appropriate manual mode.
The following example also shows how to use a different container factory.
This custom container factory must set the AckMode
to a manual type by calling the getContainerProperties()
and then calling setAckMode
on it.
Otherwise, the Acknowledgment
object will be null.
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
Consumer Record Metadata
Finally, metadata about the record is available from message headers. You can use the following header names to retrieve the headers of the message:
-
KafkaHeaders.OFFSET
-
KafkaHeaders.RECEIVED_KEY
-
KafkaHeaders.RECEIVED_TOPIC
-
KafkaHeaders.RECEIVED_PARTITION
-
KafkaHeaders.RECEIVED_TIMESTAMP
-
KafkaHeaders.TIMESTAMP_TYPE
Starting with version 2.5 the RECEIVED_KEY
is not present if the incoming record has a null
key; previously the header was populated with a null
value.
This change is to make the framework consistent with spring-messaging
conventions where null
valued headers are not present.
The following example shows how to use the headers:
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
Parameter annotations (@Payload , @Header ) must be specified on the concrete implementation of the listener method; they will not be detected if they are defined on an interface.
|
Starting with version 2.5, instead of using discrete headers, you can receive record metadata in a ConsumerRecordMetadata
parameter.
@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
...
}
This contains all the data from the ConsumerRecord
except the key and value.
Batch Listeners
Starting with version 1.1, you can configure @KafkaListener
methods to receive the entire batch of consumer records received from the consumer poll.
Non-Blocking Retries are not supported with batch listeners. |
To configure the listener container factory to create batch listeners, you can set the batchListener
property.
The following example shows how to do so:
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // <<<<<<<<<<<<<<<<<<<<<<<<<
return factory;
}
Starting with version 2.8, you can override the factory’s batchListener property using the batch property on the @KafkaListener annotation.
This, together with the changes to Container Error Handlers allows the same factory to be used for both record and batch listeners.
|
Starting with version 2.9.6, the container factory has separate setters for the recordMessageConverter and batchMessageConverter properties.
Previously, there was only one property messageConverter which applied to both record and batch listeners.
|
The following example shows how to receive a list of payloads:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
...
}
The topic, partition, offset, and so on are available in headers that parallel the payloads. The following example shows how to use the headers:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
Alternatively, you can receive a List
of Message<?>
objects with each offset and other details in each message, but it must be the only parameter (aside from optional Acknowledgment
, when using manual commits, and/or Consumer<?, ?>
parameters) defined on the method.
The following example shows how to do so:
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen1(List<Message<?>> list) {
...
}
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen2(List<Message<?>> list, Acknowledgment ack) {
...
}
@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen3(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
...
}
No conversion is performed on the payloads in this case.
If the BatchMessagingMessageConverter
is configured with a RecordMessageConverter
, you can also add a generic type to the Message
parameter and the payloads are converted.
See Payload Conversion with Batch Listeners for more information.
You can also receive a list of ConsumerRecord<?, ?>
objects, but it must be the only parameter (aside from optional Acknowledgment
, when using manual commits and Consumer<?, ?>
parameters) defined on the method.
The following example shows how to do so:
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
...
}
@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
...
}
Starting with version 2.2, the listener can receive the complete ConsumerRecords<?, ?>
object returned by the poll()
method, letting the listener access additional methods, such as partitions()
(which returns the TopicPartition
instances in the list) and records(TopicPartition)
(which gets selective records).
Again, this must be the only parameter (aside from optional Acknowledgment
, when using manual commits or Consumer<?, ?>
parameters) on the method.
The following example shows how to do so:
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
...
}
If the container factory has a RecordFilterStrategy configured, it is ignored for ConsumerRecords<?, ?> listeners, with a WARN log message emitted.
Records can only be filtered with a batch listener if the <List<?>> form of listener is used.
By default, records are filtered one-at-a-time; starting with version 2.8, you can override filterBatch to filter the entire batch in one call.
|
Annotation Properties
Starting with version 2.0, the id
property (if present) is used as the Kafka consumer group.id
property, overriding the configured property in the consumer factory, if present.
You can also set groupId
explicitly or set idIsGroup
to false to restore the previous behavior of using the consumer factory group.id
.
You can use property placeholders or SpEL expressions within most annotation properties, as the following example shows:
@KafkaListener(topics = "${some.property}")
@KafkaListener(topics = "#{someBean.someProperty}",
groupId = "#{someBean.someProperty}.group")
Starting with version 2.1.2, the SpEL expressions support a special token: __listener
.
It is a pseudo bean name that represents the current bean instance within which this annotation exists.
Consider the following example:
@Bean
public Listener listener1() {
return new Listener("topic1");
}
@Bean
public Listener listener2() {
return new Listener("topic2");
}
Given the beans in the previous example, we can then use the following:
public class Listener {
private final String topic;
public Listener(String topic) {
this.topic = topic;
}
@KafkaListener(topics = "#{__listener.topic}",
groupId = "#{__listener.topic}.group")
public void listen(...) {
...
}
public String getTopic() {
return this.topic;
}
}
If, in the unlikely event that you have an actual bean called __listener
, you can change the expression token by using the beanRef
attribute.
The following example shows how to do so:
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}", groupId = "#{__x.topic}.group")
Starting with version 2.2.4, you can specify Kafka consumer properties directly on the annotation, these will override any properties with the same name configured in the consumer factory. You cannot specify the group.id
and client.id
properties this way; they will be ignored; use the groupId
and clientIdPrefix
annotation properties for those.
The properties are specified as individual strings with the normal Java Properties
file format: foo:bar
, foo=bar
, or foo bar
, as the following example shows:
@KafkaListener(topics = "myTopic", groupId = "group", properties = {
"max.poll.interval.ms:60000",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
The following is an example of the corresponding listeners for the example in Using RoutingKafkaTemplate
.
@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
System.out.println("1: " + in);
}
@KafkaListener(id = "two", topics = "two",
properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
System.out.println("2: " + new String(in));
}