Annotation Type KafkaListener


  • @Target({TYPE,METHOD,ANNOTATION_TYPE})
    @Retention(RUNTIME)
    @MessageMapping
    @Documented
    @Repeatable(KafkaListeners.class)
    public @interface KafkaListener
    Annotation that marks a method to be the target of a Kafka message listener on the specified topics. The containerFactory() identifies the KafkaListenerContainerFactory to use to build the Kafka listener container. If not set, a default container factory is assumed to be available with a bean name of kafkaListenerContainerFactory unless an explicit default has been provided through configuration.

    Processing of @KafkaListener annotations is performed by registering a KafkaListenerAnnotationBeanPostProcessor. This can be done manually or, more conveniently, through EnableKafka annotation.

    Annotated methods are allowed to have flexible signatures similar to what MessageMapping provides, that is

    • ConsumerRecord to access to the raw Kafka message
    • Acknowledgment to manually ack
    • @Payload-annotated method arguments including the support of validation
    • @Header-annotated method arguments to extract a specific header value, defined by KafkaHeaders
    • @Headers-annotated argument that must also be assignable to Map for getting access to all headers.
    • MessageHeaders arguments for getting access to all headers.
    • MessageHeaderAccessor for convenient access to all method arguments.

    When defined at the method level, a listener container is created for each method. The MessageListener is a MessagingMessageListenerAdapter, configured with a MethodKafkaListenerEndpoint.

    When defined at the class level, a single message listener container is used to service all methods annotated with @KafkaHandler. Method signatures of such annotated methods must not cause any ambiguity such that a single method can be resolved for a particular inbound message. The MessagingMessageListenerAdapter is configured with a MultiMethodKafkaListenerEndpoint.

    Author:
    Gary Russell, Venil Noronha
    See Also:
    EnableKafka, KafkaListenerAnnotationBeanPostProcessor, KafkaListeners
    • Optional Element Summary

      Optional Elements 
      Modifier and Type Optional Element Description
      java.lang.String autoStartup
      Set to true or false, to override the default setting in the container factory.
      java.lang.String beanRef
      A pseudo bean name used in SpEL expressions within this annotation to reference the current bean within which this listener is defined.
      java.lang.String clientIdPrefix
      When provided, overrides the client id property in the consumer factory configuration.
      java.lang.String concurrency
      Override the container factory's concurrency setting for this listener.
      java.lang.String containerFactory
      The bean name of the KafkaListenerContainerFactory to use to create the message listener container responsible to serve this endpoint.
      java.lang.String containerGroup
      If provided, the listener container for this listener will be added to a bean with this value as its name, of type Collection<MessageListenerContainer>.
      java.lang.String errorHandler
      Set an KafkaListenerErrorHandler bean name to invoke if the listener method throws an exception.
      java.lang.String groupId
      Override the group.id property for the consumer factory with this value for this listener only.
      java.lang.String id
      The unique identifier of the container for this listener.
      boolean idIsGroup
      When groupId is not provided, use the id (if provided) as the group.id property for the consumer.
      java.lang.String[] properties
      Kafka consumer properties; they will supersede any properties with the same name defined in the consumer factory (if the consumer factory supports property overrides).
      boolean splitIterables
      When false and the return type is an Iterable return the result as the value of a single reply record instead of individual records for each element.
      TopicPartition[] topicPartitions
      The topicPartitions for this listener when using manual topic/partition assignment.
      java.lang.String topicPattern
      The topic pattern for this listener.
      java.lang.String[] topics
      The topics for this listener.
    • Element Detail

      • id

        java.lang.String id
        The unique identifier of the container for this listener.

        If none is specified an auto-generated id is used.

        Note: When provided, this value will override the group id property in the consumer factory configuration, unless idIsGroup() is set to false or groupId() is provided.

        SpEL #{...} and property place holders ${...} are supported.

        Returns:
        the id for the container managing for this endpoint.
        See Also:
        KafkaListenerEndpointRegistry.getListenerContainer(String)
        Default:
        ""
      • containerFactory

        java.lang.String containerFactory
        The bean name of the KafkaListenerContainerFactory to use to create the message listener container responsible to serve this endpoint.

        If not specified, the default container factory is used, if any.

        Returns:
        the container factory bean name.
        Default:
        ""
      • topics

        java.lang.String[] topics
        The topics for this listener. The entries can be 'topic name', 'property-placeholder keys' or 'expressions'. An expression must be resolved to the topic name. This uses group management and Kafka will assign partitions to group members.

        Mutually exclusive with topicPattern() and topicPartitions().

        Returns:
        the topic names or expressions (SpEL) to listen to.
        Default:
        {}
      • topicPattern

        java.lang.String topicPattern
        The topic pattern for this listener. The entries can be 'topic pattern', a 'property-placeholder key' or an 'expression'. The framework will create a container that subscribes to all topics matching the specified pattern to get dynamically assigned partitions. The pattern matching will be performed periodically against topics existing at the time of check. An expression must be resolved to the topic pattern (String or Pattern result types are supported). This uses group management and Kafka will assign partitions to group members.

        Mutually exclusive with topics() and topicPartitions().

        Returns:
        the topic pattern or expression (SpEL).
        See Also:
        CommonClientConfigs.METADATA_MAX_AGE_CONFIG
        Default:
        ""
      • topicPartitions

        TopicPartition[] topicPartitions
        The topicPartitions for this listener when using manual topic/partition assignment.

        Mutually exclusive with topicPattern() and topics().

        Returns:
        the topic names or expressions (SpEL) to listen to.
        Default:
        {}
      • containerGroup

        java.lang.String containerGroup
        If provided, the listener container for this listener will be added to a bean with this value as its name, of type Collection<MessageListenerContainer>. This allows, for example, iteration over the collection to start/stop a subset of containers.

        SpEL #{...} and property place holders ${...} are supported.

        Returns:
        the bean name for the group.
        Default:
        ""
      • errorHandler

        java.lang.String errorHandler
        Set an KafkaListenerErrorHandler bean name to invoke if the listener method throws an exception.
        Returns:
        the error handler.
        Since:
        1.3
        Default:
        ""
      • groupId

        java.lang.String groupId
        Override the group.id property for the consumer factory with this value for this listener only.

        SpEL #{...} and property place holders ${...} are supported.

        Returns:
        the group id.
        Since:
        1.3
        Default:
        ""
      • idIsGroup

        boolean idIsGroup
        When groupId is not provided, use the id (if provided) as the group.id property for the consumer. Set to false, to use the group.id from the consumer factory.
        Returns:
        false to disable.
        Since:
        1.3
        Default:
        true
      • clientIdPrefix

        java.lang.String clientIdPrefix
        When provided, overrides the client id property in the consumer factory configuration. A suffix ('-n') is added for each container instance to ensure uniqueness when concurrency is used.

        SpEL #{...} and property place holders ${...} are supported.

        Returns:
        the client id prefix.
        Since:
        2.1.1
        Default:
        ""
      • beanRef

        java.lang.String beanRef
        A pseudo bean name used in SpEL expressions within this annotation to reference the current bean within which this listener is defined. This allows access to properties and methods within the enclosing bean. Default '__listener'.

        Example: topics = "#{__listener.topicList}".

        Returns:
        the pseudo bean name.
        Since:
        2.1.2
        Default:
        "__listener"
      • concurrency

        java.lang.String concurrency
        Override the container factory's concurrency setting for this listener. May be a property placeholder or SpEL expression that evaluates to a Number, in which case Number.intValue() is used to obtain the value.

        SpEL #{...} and property place holders ${...} are supported.

        Returns:
        the concurrency.
        Since:
        2.2
        Default:
        ""
      • autoStartup

        java.lang.String autoStartup
        Set to true or false, to override the default setting in the container factory. May be a property placeholder or SpEL expression that evaluates to a Boolean or a String, in which case the Boolean.parseBoolean(String) is used to obtain the value.

        SpEL #{...} and property place holders ${...} are supported.

        Returns:
        true to auto start, false to not auto start.
        Since:
        2.2
        Default:
        ""
      • properties

        java.lang.String[] properties
        Kafka consumer properties; they will supersede any properties with the same name defined in the consumer factory (if the consumer factory supports property overrides).

        Supported Syntax

        The supported syntax for key-value pairs is the same as the syntax defined for entries in a Java properties file:

        • key=value
        • key:value
        • key value
        group.id and client.id are ignored.
        Returns:
        the properties.
        Since:
        2.2.4
        See Also:
        ConsumerConfig, groupId(), clientIdPrefix()
        Default:
        {}
      • splitIterables

        boolean splitIterables
        When false and the return type is an Iterable return the result as the value of a single reply record instead of individual records for each element. Default true. Ignored if the reply is of type Iterable<Message<?>>.
        Returns:
        false to create a single reply record.
        Since:
        2.3.5
        Default:
        true