K
- the key type.V
- the value type.public class BatchMessagingMessageListenerAdapter<K,V> extends MessagingMessageListenerAdapter<K,V> implements BatchAcknowledgingConsumerAwareMessageListener<K,V>
MessageListener
adapter that invokes a configurable HandlerAdapter
; used when the factory is
configured for the listener to receive batches of messages.
Wraps the incoming Kafka Message to Spring's Message
abstraction.
The original List<ConsumerRecord>
and
the Acknowledgment
are provided as additional arguments so that these can
be injected as method arguments if necessary.
MessagingMessageListenerAdapter.ReplyExpressionRoot
ConsumerSeekAware.ConsumerSeekCallback
logger, NULL_MESSAGE
Constructor and Description |
---|
BatchMessagingMessageListenerAdapter(java.lang.Object bean,
java.lang.reflect.Method method) |
BatchMessagingMessageListenerAdapter(java.lang.Object bean,
java.lang.reflect.Method method,
KafkaListenerErrorHandler errorHandler) |
Modifier and Type | Method and Description |
---|---|
protected BatchMessageConverter |
getBatchMessageConverter()
Return the
BatchMessagingMessageConverter for this listener,
being able to convert Message . |
protected void |
invoke(java.lang.Object records,
Acknowledgment acknowledgment,
org.apache.kafka.clients.consumer.Consumer<?,?> consumer,
org.springframework.messaging.Message<?> messageArg) |
void |
onMessage(org.apache.kafka.clients.consumer.ConsumerRecords<K,V> records,
Acknowledgment acknowledgment,
org.apache.kafka.clients.consumer.Consumer<K,V> consumer)
Listener receives the original
ConsumerRecords object instead of a
list of ConsumerRecord . |
void |
onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> records,
Acknowledgment acknowledgment,
org.apache.kafka.clients.consumer.Consumer<?,?> consumer)
Kafka
MessageListener entry point. |
void |
setBatchMessageConverter(BatchMessageConverter messageConverter)
Set the BatchMessageConverter.
|
protected org.springframework.messaging.Message<?> |
toMessagingMessage(java.util.List records,
Acknowledgment acknowledgment,
org.apache.kafka.clients.consumer.Consumer<?,?> consumer) |
boolean |
wantsPollResult()
Return true if this listener wishes to receive the original
ConsumerRecords
object instead of a list of ConsumerRecord . |
createMessagingErrorMessage, determineInferredType, getMessageConverter, getReplyHeadersConfigurer, getType, handleResult, invokeHandler, isConsumerRecordList, isConsumerRecords, isConversionNeeded, isMessageList, isSplitIterables, onIdleContainer, onPartitionsAssigned, onPartitionsRevoked, registerSeekCallback, sendResponse, sendResponse, setBeanResolver, setFallbackType, setHandlerMethod, setMessageConverter, setReplyHeadersConfigurer, setReplyTemplate, setReplyTopic, setSplitIterables, toMessagingMessage
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
onMessage
onMessage, onMessage
public BatchMessagingMessageListenerAdapter(java.lang.Object bean, java.lang.reflect.Method method)
public BatchMessagingMessageListenerAdapter(java.lang.Object bean, java.lang.reflect.Method method, KafkaListenerErrorHandler errorHandler)
public void setBatchMessageConverter(BatchMessageConverter messageConverter)
messageConverter
- the converter.protected final BatchMessageConverter getBatchMessageConverter()
BatchMessagingMessageConverter
for this listener,
being able to convert Message
.BatchMessagingMessageConverter
for this listener,
being able to convert Message
.public boolean wantsPollResult()
BatchMessageListener
ConsumerRecords
object instead of a list of ConsumerRecord
.wantsPollResult
in interface BatchMessageListener<K,V>
public void onMessage(org.apache.kafka.clients.consumer.ConsumerRecords<K,V> records, Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<K,V> consumer)
BatchMessageListener
ConsumerRecords
object instead of a
list of ConsumerRecord
.onMessage
in interface BatchMessageListener<K,V>
records
- the records.acknowledgment
- the acknowledgment (null if not manual acks)consumer
- the consumer.public void onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> records, Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?,?> consumer)
MessageListener
entry point.
Delegate the message to the target listener method, with appropriate conversion of the message argument.
onMessage
in interface BatchAcknowledgingConsumerAwareMessageListener<K,V>
onMessage
in interface GenericMessageListener<java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>>
records
- the incoming list of Kafka ConsumerRecord
.acknowledgment
- the acknowledgment.consumer
- the consumer.protected void invoke(java.lang.Object records, Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?,?> consumer, org.springframework.messaging.Message<?> messageArg)
protected org.springframework.messaging.Message<?> toMessagingMessage(java.util.List records, Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?,?> consumer)