This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-stream 4.2.0!

Kafka Binder Listener Container Customizers

Spring Cloud Stream provides powerful customization options for message listener containers through the use of customizers. This section covers the customizer interfaces available for Kafka: ListenerContainerCustomizer, its Kafka-specific extension KafkaListenerContainerCustomizer, and the specialized ListenerContainerWithDlqAndRetryCustomizer.

ListenerContainerCustomizer

The ListenerContainerCustomizer is a generic interface in Spring Cloud Stream that allows customization of message listener containers.

Purpose

Use this customizer when you need to modify the behavior of the listener container.

Usage

To use the ListenerContainerCustomizer, create a bean that implements this interface in your configuration:

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> genericCustomizer() {
    return (container, destinationName, group) -> {
        // Customize the container here
    };
}

The ListenerContainerCustomizer interface defines the following method:

void configure(C container, String destinationName, String group);
  • container: The message listener container to customize.

  • destinationName: The name of the destination (topic).

  • group: The consumer group ID.

KafkaListenerContainerCustomizer

The KafkaListenerContainerCustomizer interface extends ListenerContainerCustomizer to modify the behavior of the listener container and provides access to the binding-specific extended Kafka consumer properties.

Purpose

Use this customizer when you need to access the binding-specific extended Kafka consumer properties while customizing the listener container.

Usage

To use the KafkaListenerContainerCustomizer, create a bean that implements this interface in your configuration:

@Bean
public KafkaListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> kafkaCustomizer() {
    return (container, destinationName, group, properties) -> {
        // Customize the Kafka container here
    };
}

The KafkaListenerContainerCustomizer interface adds the following method:

default void configureKafkaListenerContainer(
    C container,
    String destinationName,
    String group,
    ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        configure(container, destinationName, group);
}

This method extends the base configure method with an additional parameter:

  • extendedConsumerProperties: Extended consumer properties, including Kafka-specific properties.

ListenerContainerWithDlqAndRetryCustomizer

The ListenerContainerWithDlqAndRetryCustomizer interface provides additional customization options for scenarios involving Dead Letter Queues (DLQ) and retry mechanisms.

Purpose

Use this customizer when you need to fine-tune DLQ behavior or implement custom retry logic for your Kafka consumers.

Usage

To use the ListenerContainerWithDlqAndRetryCustomizer, create a bean that implements this interface in your configuration:

@Bean
public ListenerContainerWithDlqAndRetryCustomizer dlqCustomizer() {
    return (container, destinationName, group, dlqDestinationResolver, backOff, properties) -> {
        // Access the container here with access to the extended consumer binding properties.
    };
}

The ListenerContainerWithDlqAndRetryCustomizer interface defines the following method:

void configure(
    AbstractMessageListenerContainer<?, ?> container,
    String destinationName,
    String group,
    BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
    BackOff backOff,
    ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties
);
  • container: The Kafka listener container to customize.

  • destinationName: The name of the destination (topic).

  • group: The consumer group ID.

  • dlqDestinationResolver: A function to resolve the DLQ destination for a failed record.

  • backOff: The backoff policy for retries.

  • extendedConsumerProperties: Extended consumer properties, including Kafka-specific properties.

Summary

  • ListenerContainerWithDlqAndRetryCustomizer is used if DLQ is enabled.

  • KafkaListenerContainerCustomizer is used for Kafka-specific customization without DLQ.

  • The base ListenerContainerCustomizer is used for generic customization.

This hierarchical approach allows for flexible and specific customization of your Kafka listener containers in Spring Cloud Stream applications.