Annotation Interface KafkaListener


Annotation that marks a method to be the target of a Kafka message listener on the specified topics. The containerFactory() identifies the KafkaListenerContainerFactory to use to build the Kafka listener container. If not set, a default container factory is assumed to be available with a bean name of kafkaListenerContainerFactory unless an explicit default has been provided through configuration.

Processing of @KafkaListener annotations is performed by registering a KafkaListenerAnnotationBeanPostProcessor. This can be done manually or, more conveniently, through EnableKafka annotation.

Annotated methods are allowed to have flexible signatures similar to what MessageMapping provides, that is

  • ConsumerRecord to access to the raw Kafka message
  • Acknowledgment to manually ack
  • @Payload-annotated method arguments including the support of validation
  • @Header-annotated method arguments to extract a specific header value, defined by KafkaHeaders
  • @Headers-annotated argument that must also be assignable to Map for getting access to all headers.
  • MessageHeaders arguments for getting access to all headers.
  • MessageHeaderAccessor for convenient access to all method arguments.

When defined at the method level, a listener container is created for each method. The MessageListener is a MessagingMessageListenerAdapter, configured with a MethodKafkaListenerEndpoint.

When defined at the class level, a single message listener container is used to service all methods annotated with @KafkaHandler. Method signatures of such annotated methods must not cause any ambiguity such that a single method can be resolved for a particular inbound message. The MessagingMessageListenerAdapter is configured with a MultiMethodKafkaListenerEndpoint.

Author:
Gary Russell, Venil Noronha
See Also:
  • Element Details

    • id

      String id
      The unique identifier of the container for this listener.

      If none is specified an auto-generated id is used.

      Note: When provided, this value will override the group id property in the consumer factory configuration, unless idIsGroup() is set to false or groupId() is provided.

      SpEL #{...} and property place holders ${...} are supported.

      Returns:
      the id for the container managing for this endpoint.
      See Also:
      Default:
      ""
    • containerFactory

      String containerFactory
      The bean name of the KafkaListenerContainerFactory to use to create the message listener container responsible to serve this endpoint.

      If not specified, the default container factory is used, if any. If a SpEL expression is provided (#{...}), the expression can either evaluate to a container factory instance or a bean name.

      Returns:
      the container factory bean name.
      Default:
      ""
    • topics

      String[] topics
      The topics for this listener. The entries can be 'topic name', 'property-placeholder keys' or 'expressions'. An expression must be resolved to the topic name. This uses group management and Kafka will assign partitions to group members.

      Mutually exclusive with topicPattern() and topicPartitions().

      Returns:
      the topic names or expressions (SpEL) to listen to.
      Default:
      {}
    • topicPattern

      String topicPattern
      The topic pattern for this listener. The entries can be 'topic pattern', a 'property-placeholder key' or an 'expression'. The framework will create a container that subscribes to all topics matching the specified pattern to get dynamically assigned partitions. The pattern matching will be performed periodically against topics existing at the time of check. An expression must be resolved to the topic pattern (String or Pattern result types are supported). This uses group management and Kafka will assign partitions to group members.

      Mutually exclusive with topics() and topicPartitions().

      Returns:
      the topic pattern or expression (SpEL).
      See Also:
      • CommonClientConfigs.METADATA_MAX_AGE_CONFIG
      Default:
      ""
    • topicPartitions

      TopicPartition[] topicPartitions
      The topicPartitions for this listener when using manual topic/partition assignment.

      Mutually exclusive with topicPattern() and topics().

      Returns:
      the topic names or expressions (SpEL) to listen to.
      Default:
      {}
    • containerGroup

      String containerGroup
      If provided, the listener container for this listener will be added to a bean with this value as its name, of type Collection<MessageListenerContainer>. This allows, for example, iteration over the collection to start/stop a subset of containers. The Collection beans are deprecated as of version 2.7.3 and will be removed in 2.8. Instead, a bean with name containerGroup + ".group" and type ContainerGroup should be used instead.

      SpEL #{...} and property place holders ${...} are supported.

      Returns:
      the bean name for the group.
      Default:
      ""
    • errorHandler

      String errorHandler
      Set an KafkaListenerErrorHandler bean name to invoke if the listener method throws an exception. If a SpEL expression is provided (#{...}), the expression can either evaluate to a KafkaListenerErrorHandler instance or a bean name.
      Returns:
      the error handler.
      Since:
      1.3
      Default:
      ""
    • groupId

      String groupId
      Override the group.id property for the consumer factory with this value for this listener only.

      SpEL #{...} and property place holders ${...} are supported.

      Returns:
      the group id.
      Since:
      1.3
      Default:
      ""
    • idIsGroup

      boolean idIsGroup
      When groupId is not provided, use the id (if provided) as the group.id property for the consumer. Set to false, to use the group.id from the consumer factory.
      Returns:
      false to disable.
      Since:
      1.3
      Default:
      true
    • clientIdPrefix

      String clientIdPrefix
      When provided, overrides the client id property in the consumer factory configuration. A suffix ('-n') is added for each container instance to ensure uniqueness when concurrency is used.

      SpEL #{...} and property place holders ${...} are supported.

      Returns:
      the client id prefix.
      Since:
      2.1.1
      Default:
      ""
    • beanRef

      String beanRef
      A pseudo bean name used in SpEL expressions within this annotation to reference the current bean within which this listener is defined. This allows access to properties and methods within the enclosing bean. Default '__listener'.

      Example: topics = "#{__listener.topicList}".

      Returns:
      the pseudo bean name.
      Since:
      2.1.2
      Default:
      "__listener"
    • concurrency

      String concurrency
      Override the container factory's concurrency setting for this listener. May be a property placeholder or SpEL expression that evaluates to a Number, in which case Number.intValue() is used to obtain the value.

      SpEL #{...} and property place holders ${...} are supported.

      Returns:
      the concurrency.
      Since:
      2.2
      Default:
      ""
    • autoStartup

      String autoStartup
      Set to true or false, to override the default setting in the container factory. May be a property placeholder or SpEL expression that evaluates to a Boolean or a String, in which case the Boolean.parseBoolean(String) is used to obtain the value.

      SpEL #{...} and property place holders ${...} are supported.

      Returns:
      true to auto start, false to not auto start.
      Since:
      2.2
      Default:
      ""
    • properties

      String[] properties
      Kafka consumer properties; they will supersede any properties with the same name defined in the consumer factory (if the consumer factory supports property overrides).

      Supported Syntax

      The supported syntax for key-value pairs is the same as the syntax defined for entries in a Java properties file:

      • key=value
      • key:value
      • key value
      group.id and client.id are ignored.

      SpEL #{...} and property place holders ${...} are supported. SpEL expressions must resolve to a String, a @{link String[]} or a Collection<String> where each member of the array or collection is a property name + value with the above formats.

      Returns:
      the properties.
      Since:
      2.2.4
      See Also:
      Default:
      {}
    • splitIterables

      boolean splitIterables
      When false and the return type is an Iterable return the result as the value of a single reply record instead of individual records for each element. Default true. Ignored if the reply is of type Iterable<Message<?>>.
      Returns:
      false to create a single reply record.
      Since:
      2.3.5
      Default:
      true
    • contentTypeConverter

      String contentTypeConverter
      Set the bean name of a SmartMessageConverter (such as the CompositeMessageConverter) to use in conjunction with the MessageHeaders.CONTENT_TYPE header to perform the conversion to the required type. If a SpEL expression is provided (#{...}), the expression can either evaluate to a SmartMessageConverter instance or a bean name.
      Returns:
      the bean name.
      Since:
      2.7.1
      Default:
      ""
    • batch

      String batch
      Override the container factory's batchListener property. The listener method signature should receive a List<?>; refer to the reference documentation. This allows a single container factory to be used for both record and batch listeners; previously separate container factories were required.
      Returns:
      "true" for the annotated method to be a batch listener or "false" for a record listener. If not set, the container factory setting is used. SpEL and property placeholders are not supported because the listener type cannot be variable.
      Since:
      2.8
      See Also:
      Default:
      ""
    • filter

      String filter
      Set an RecordFilterStrategy bean name to override the strategy configured on the container factory. If a SpEL expression is provided (#{...}), the expression can either evaluate to a RecordFilterStrategy instance or a bean name.
      Returns:
      the filter.
      Since:
      2.8.4
      Default:
      ""
    • info

      String info
      Static information that will be added as a header with key KafkaHeaders.LISTENER_INFO. This can be used, for example, in a RecordInterceptor, RecordFilterStrategy or the listener itself, for any purposes.

      SpEL #{...} and property place holders ${...} are supported, but it must resolve to a String or byte[].

      This header will be stripped out if an outbound record is created with the headers from an input record.

      Returns:
      the info.
      Since:
      2.8.4
      Default:
      ""
    • containerPostProcessor

      String containerPostProcessor
      Set the bean name of a ContainerPostProcessor to allow customizing the container after its creation and configuration. This post processor is only applied on the current listener container in contrast to the ContainerCustomizer which is applied on all listener containers. This post processor is applied after the container customizer (if present).
      Returns:
      the bean name of the container post processor.
      Since:
      3.1
      Default:
      ""