Annotation Interface KafkaListener
containerFactory()
identifies the
KafkaListenerContainerFactory
to use to build the Kafka listener container. If not
set, a default container factory is assumed to be available with a bean name
of kafkaListenerContainerFactory
unless an explicit default has been provided
through configuration.
Processing of @KafkaListener
annotations is performed by registering a
KafkaListenerAnnotationBeanPostProcessor
. This can be done manually or, more
conveniently, through EnableKafka
annotation.
Annotated methods are allowed to have flexible signatures similar to what
MessageMapping
provides, that is
ConsumerRecord
to access to the raw Kafka messageAcknowledgment
to manually ack@Payload
-annotated method arguments including the support of validation@Header
-annotated method arguments to extract a specific header value, defined byKafkaHeaders
@Headers
-annotated argument that must also be assignable toMap
for getting access to all headers.MessageHeaders
arguments for getting access to all headers.MessageHeaderAccessor
for convenient access to all method arguments.
When defined at the method level, a listener container is created for each method.
The MessageListener
is a
MessagingMessageListenerAdapter
,
configured with a MethodKafkaListenerEndpoint
.
When defined at the class level, a single message listener container is used to
service all methods annotated with @KafkaHandler
. Method signatures of such
annotated methods must not cause any ambiguity such that a single method can be
resolved for a particular inbound message. The
MessagingMessageListenerAdapter
is
configured with a
MultiMethodKafkaListenerEndpoint
.
- Author:
- Gary Russell, Venil Noronha
- See Also:
-
Optional Element Summary
Optional ElementsModifier and TypeOptional ElementDescriptionSet to true or false, to override the default setting in the container factory.Override the container factory'sbatchListener
property.A pseudo bean name used in SpEL expressions within this annotation to reference the current bean within which this listener is defined.When provided, overrides the client id property in the consumer factory configuration.Override the container factory'sconcurrency
setting for this listener.The bean name of theKafkaListenerContainerFactory
to use to create the message listener container responsible to serve this endpoint.If provided, the listener container for this listener will be added to a bean with this value as its name, of typeCollection<MessageListenerContainer>
.Set the bean name of aContainerPostProcessor
to allow customizing the container after its creation and configuration.Set the bean name of aSmartMessageConverter
(such as theCompositeMessageConverter
) to use in conjunction with theMessageHeaders.CONTENT_TYPE
header to perform the conversion to the required type.Set anKafkaListenerErrorHandler
bean name to invoke if the listener method throws an exception.Set anRecordFilterStrategy
bean name to override the strategy configured on the container factory.Override thegroup.id
property for the consumer factory with this value for this listener only.The unique identifier of the container for this listener.boolean
Static information that will be added as a header with keyKafkaHeaders.LISTENER_INFO
.String[]
Kafka consumer properties; they will supersede any properties with the same name defined in the consumer factory (if the consumer factory supports property overrides).boolean
When false and the return type is anIterable
return the result as the value of a single reply record instead of individual records for each element.The topicPartitions for this listener when using manual topic/partition assignment.The topic pattern for this listener.String[]
The topics for this listener.
-
Element Details
-
id
String idThe unique identifier of the container for this listener.If none is specified an auto-generated id is used.
Note: When provided, this value will override the group id property in the consumer factory configuration, unless
idIsGroup()
is set to false orgroupId()
is provided.SpEL
#{...}
and property place holders${...}
are supported.- Returns:
- the
id
for the container managing for this endpoint. - See Also:
- Default:
- ""
-
containerFactory
String containerFactoryThe bean name of theKafkaListenerContainerFactory
to use to create the message listener container responsible to serve this endpoint.If not specified, the default container factory is used, if any. If a SpEL expression is provided (
#{...}
), the expression can either evaluate to a container factory instance or a bean name.- Returns:
- the container factory bean name.
- Default:
- ""
-
topics
String[] topicsThe topics for this listener. The entries can be 'topic name', 'property-placeholder keys' or 'expressions'. An expression must be resolved to the topic name. This uses group management and Kafka will assign partitions to group members.Mutually exclusive with
topicPattern()
andtopicPartitions()
.- Returns:
- the topic names or expressions (SpEL) to listen to.
- Default:
- {}
-
topicPattern
String topicPatternThe topic pattern for this listener. The entries can be 'topic pattern', a 'property-placeholder key' or an 'expression'. The framework will create a container that subscribes to all topics matching the specified pattern to get dynamically assigned partitions. The pattern matching will be performed periodically against topics existing at the time of check. An expression must be resolved to the topic pattern (String or Pattern result types are supported). This uses group management and Kafka will assign partitions to group members.Mutually exclusive with
topics()
andtopicPartitions()
.- Returns:
- the topic pattern or expression (SpEL).
- See Also:
-
CommonClientConfigs.METADATA_MAX_AGE_CONFIG
- Default:
- ""
-
topicPartitions
TopicPartition[] topicPartitionsThe topicPartitions for this listener when using manual topic/partition assignment.Mutually exclusive with
topicPattern()
andtopics()
.- Returns:
- the topic names or expressions (SpEL) to listen to.
- Default:
- {}
-
containerGroup
String containerGroupIf provided, the listener container for this listener will be added to a bean with this value as its name, of typeCollection<MessageListenerContainer>
. This allows, for example, iteration over the collection to start/stop a subset of containers. TheCollection
beans are deprecated as of version 2.7.3 and will be removed in 2.8. Instead, a bean with namecontainerGroup + ".group"
and typeContainerGroup
should be used instead.SpEL
#{...}
and property place holders${...}
are supported.- Returns:
- the bean name for the group.
- Default:
- ""
-
errorHandler
String errorHandlerSet anKafkaListenerErrorHandler
bean name to invoke if the listener method throws an exception. If a SpEL expression is provided (#{...}
), the expression can either evaluate to aKafkaListenerErrorHandler
instance or a bean name.- Returns:
- the error handler.
- Since:
- 1.3
- Default:
- ""
-
groupId
String groupIdOverride thegroup.id
property for the consumer factory with this value for this listener only.SpEL
#{...}
and property place holders${...}
are supported.- Returns:
- the group id.
- Since:
- 1.3
- Default:
- ""
-
idIsGroup
boolean idIsGroupWhengroupId
is not provided, use theid
(if provided) as thegroup.id
property for the consumer. Set to false, to use thegroup.id
from the consumer factory.- Returns:
- false to disable.
- Since:
- 1.3
- Default:
- true
-
clientIdPrefix
String clientIdPrefixWhen provided, overrides the client id property in the consumer factory configuration. A suffix ('-n') is added for each container instance to ensure uniqueness when concurrency is used.SpEL
#{...}
and property place holders${...}
are supported.- Returns:
- the client id prefix.
- Since:
- 2.1.1
- Default:
- ""
-
beanRef
String beanRefA pseudo bean name used in SpEL expressions within this annotation to reference the current bean within which this listener is defined. This allows access to properties and methods within the enclosing bean. Default '__listener'.Example:
topics = "#{__listener.topicList}"
.- Returns:
- the pseudo bean name.
- Since:
- 2.1.2
- Default:
- "__listener"
-
concurrency
String concurrencyOverride the container factory'sconcurrency
setting for this listener. May be a property placeholder or SpEL expression that evaluates to aNumber
, in which caseNumber.intValue()
is used to obtain the value.SpEL
#{...}
and property place holders${...}
are supported.- Returns:
- the concurrency.
- Since:
- 2.2
- Default:
- ""
-
autoStartup
String autoStartupSet to true or false, to override the default setting in the container factory. May be a property placeholder or SpEL expression that evaluates to aBoolean
or aString
, in which case theBoolean.parseBoolean(String)
is used to obtain the value.SpEL
#{...}
and property place holders${...}
are supported.- Returns:
- true to auto start, false to not auto start.
- Since:
- 2.2
- Default:
- ""
-
properties
String[] propertiesKafka consumer properties; they will supersede any properties with the same name defined in the consumer factory (if the consumer factory supports property overrides).Supported Syntax
The supported syntax for key-value pairs is the same as the syntax defined for entries in a Java properties file:
key=value
key:value
key value
group.id
andclient.id
are ignored.SpEL
#{...}
and property place holders${...}
are supported. SpEL expressions must resolve to aString
, a @{link String[]} or aCollection<String>
where each member of the array or collection is a property name + value with the above formats.- Returns:
- the properties.
- Since:
- 2.2.4
- See Also:
-
ConsumerConfig
groupId()
clientIdPrefix()
- Default:
- {}
-
splitIterables
boolean splitIterablesWhen false and the return type is anIterable
return the result as the value of a single reply record instead of individual records for each element. Default true. Ignored if the reply is of typeIterable<Message<?>>
.- Returns:
- false to create a single reply record.
- Since:
- 2.3.5
- Default:
- true
-
contentTypeConverter
String contentTypeConverterSet the bean name of aSmartMessageConverter
(such as theCompositeMessageConverter
) to use in conjunction with theMessageHeaders.CONTENT_TYPE
header to perform the conversion to the required type. If a SpEL expression is provided (#{...}
), the expression can either evaluate to aSmartMessageConverter
instance or a bean name.- Returns:
- the bean name.
- Since:
- 2.7.1
- Default:
- ""
-
batch
String batchOverride the container factory'sbatchListener
property. The listener method signature should receive aList<?>
; refer to the reference documentation. This allows a single container factory to be used for both record and batch listeners; previously separate container factories were required.- Returns:
- "true" for the annotated method to be a batch listener or "false" for a record listener. If not set, the container factory setting is used. SpEL and property placeholders are not supported because the listener type cannot be variable.
- Since:
- 2.8
- See Also:
- Default:
- ""
-
filter
String filterSet anRecordFilterStrategy
bean name to override the strategy configured on the container factory. If a SpEL expression is provided (#{...}
), the expression can either evaluate to aRecordFilterStrategy
instance or a bean name.- Returns:
- the filter.
- Since:
- 2.8.4
- Default:
- ""
-
info
String infoStatic information that will be added as a header with keyKafkaHeaders.LISTENER_INFO
. This can be used, for example, in aRecordInterceptor
,RecordFilterStrategy
or the listener itself, for any purposes.SpEL
#{...}
and property place holders${...}
are supported, but it must resolve to a String orbyte[]
.This header will be stripped out if an outbound record is created with the headers from an input record.
- Returns:
- the info.
- Since:
- 2.8.4
- Default:
- ""
-
containerPostProcessor
String containerPostProcessorSet the bean name of aContainerPostProcessor
to allow customizing the container after its creation and configuration. This post processor is only applied on the current listener container in contrast to theContainerCustomizer
which is applied on all listener containers. This post processor is applied after the container customizer (if present).- Returns:
- the bean name of the container post processor.
- Since:
- 3.1
- Default:
- ""
-