Kafka Binder Listener Container Customizers
Spring Cloud Stream provides powerful customization options for message listener containers through the use of customizers.
This section covers the customizer interfaces available for Kafka: ListenerContainerCustomizer
, its Kafka-specific extension KafkaListenerContainerCustomizer
, and the specialized ListenerContainerWithDlqAndRetryCustomizer
.
ListenerContainerCustomizer
The ListenerContainerCustomizer
is a generic interface in Spring Cloud Stream that allows customization of message listener containers.
Usage
To use the ListenerContainerCustomizer
, create a bean that implements this interface in your configuration:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> genericCustomizer() {
return (container, destinationName, group) -> {
// Customize the container here
};
}
The ListenerContainerCustomizer
interface defines the following method:
void configure(C container, String destinationName, String group);
-
container
: The message listener container to customize. -
destinationName
: The name of the destination (topic). -
group
: The consumer group ID.
KafkaListenerContainerCustomizer
The KafkaListenerContainerCustomizer
interface extends ListenerContainerCustomizer
to modify the behavior of the listener container and provides access to the binding-specific extended Kafka consumer properties.
Purpose
Use this customizer when you need to access the binding-specific extended Kafka consumer properties while customizing the listener container.
Usage
To use the KafkaListenerContainerCustomizer
, create a bean that implements this interface in your configuration:
@Bean
public KafkaListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> kafkaCustomizer() {
return (container, destinationName, group, properties) -> {
// Customize the Kafka container here
};
}
The KafkaListenerContainerCustomizer
interface adds the following method:
default void configureKafkaListenerContainer(
C container,
String destinationName,
String group,
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
configure(container, destinationName, group);
}
This method extends the base configure
method with an additional parameter:
-
extendedConsumerProperties
: Extended consumer properties, including Kafka-specific properties.
ListenerContainerWithDlqAndRetryCustomizer
The ListenerContainerWithDlqAndRetryCustomizer
interface provides additional customization options for scenarios involving Dead Letter Queues (DLQ) and retry mechanisms.
Purpose
Use this customizer when you need to fine-tune DLQ behavior or implement custom retry logic for your Kafka consumers.
Usage
To use the ListenerContainerWithDlqAndRetryCustomizer
, create a bean that implements this interface in your configuration:
@Bean
public ListenerContainerWithDlqAndRetryCustomizer dlqCustomizer() {
return (container, destinationName, group, dlqDestinationResolver, backOff, properties) -> {
// Access the container here with access to the extended consumer binding properties.
};
}
The ListenerContainerWithDlqAndRetryCustomizer
interface defines the following method:
void configure(
AbstractMessageListenerContainer<?, ?> container,
String destinationName,
String group,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
BackOff backOff,
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties
);
-
container
: The Kafka listener container to customize. -
destinationName
: The name of the destination (topic). -
group
: The consumer group ID. -
dlqDestinationResolver
: A function to resolve the DLQ destination for a failed record. -
backOff
: The backoff policy for retries. -
extendedConsumerProperties
: Extended consumer properties, including Kafka-specific properties.
Summary
-
ListenerContainerWithDlqAndRetryCustomizer
is used if DLQ is enabled. -
KafkaListenerContainerCustomizer
is used for Kafka-specific customization without DLQ. -
The base
ListenerContainerCustomizer
is used for generic customization.
This hierarchical approach allows for flexible and specific customization of your Kafka listener containers in Spring Cloud Stream applications.