K
- the key type.V
- the value type.public abstract class MessagingMessageListenerAdapter<K,V> extends java.lang.Object implements ConsumerSeekAware
MessageListener
adapter providing the necessary infrastructure
to extract the payload of a Message
.ConsumerSeekAware.ConsumerSeekCallback
Modifier and Type | Field and Description |
---|---|
protected org.apache.commons.logging.Log |
logger |
Constructor and Description |
---|
MessagingMessageListenerAdapter(java.lang.Object bean,
java.lang.reflect.Method method) |
Modifier and Type | Method and Description |
---|---|
protected java.lang.reflect.Type |
determineInferredType(java.lang.reflect.Method method)
Subclasses can override this method to use a different mechanism to determine
the target type of the payload conversion.
|
protected RecordMessageConverter |
getMessageConverter()
Return the
MessagingMessageConverter for this listener,
being able to convert Message . |
protected java.lang.reflect.Type |
getType()
Returns the inferred type for conversion or, if null, the
fallbackType . |
protected java.lang.Object |
invokeHandler(java.lang.Object data,
Acknowledgment acknowledgment,
org.springframework.messaging.Message<?> message)
Invoke the handler, wrapping any exception to a
ListenerExecutionFailedException
with a dedicated error message. |
protected boolean |
isConsumerRecordList() |
protected boolean |
isMessageList() |
void |
onIdleContainer(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> assignments,
ConsumerSeekAware.ConsumerSeekCallback callback)
If the container is configured to emit idle container events, this method is called
when the container idle event is emitted - allowing a seek operation.
|
void |
onPartitionsAssigned(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> assignments,
ConsumerSeekAware.ConsumerSeekCallback callback)
When using group management, called when partition assignments change.
|
void |
registerSeekCallback(ConsumerSeekAware.ConsumerSeekCallback callback)
Register the callback to use when seeking at some arbitrary time.
|
void |
setFallbackType(java.lang.Class<?> fallbackType)
Set a fallback type to use when using a type-aware message converter and this
adapter cannot determine the inferred type from the method.
|
void |
setHandlerMethod(HandlerAdapter handlerMethod)
Set the
HandlerAdapter to use to invoke the method
processing an incoming ConsumerRecord . |
void |
setMessageConverter(RecordMessageConverter messageConverter)
Set the MessageConverter.
|
protected org.springframework.messaging.Message<?> |
toMessagingMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> record,
Acknowledgment acknowledgment) |
public MessagingMessageListenerAdapter(java.lang.Object bean, java.lang.reflect.Method method)
public void setMessageConverter(RecordMessageConverter messageConverter)
messageConverter
- the converter.protected final RecordMessageConverter getMessageConverter()
MessagingMessageConverter
for this listener,
being able to convert Message
.MessagingMessageConverter
for this listener,
being able to convert Message
.protected java.lang.reflect.Type getType()
fallbackType
.public void setFallbackType(java.lang.Class<?> fallbackType)
StringJsonMessageConverter
. Defaults to
Object
.fallbackType
- the type.public void setHandlerMethod(HandlerAdapter handlerMethod)
HandlerAdapter
to use to invoke the method
processing an incoming ConsumerRecord
.handlerMethod
- HandlerAdapter
instance.protected boolean isConsumerRecordList()
protected boolean isMessageList()
public void registerSeekCallback(ConsumerSeekAware.ConsumerSeekCallback callback)
ConsumerSeekAware
ConcurrentMessageListenerContainer
or the same listener instance in multiple
containers listeners should store the callback in a ThreadLocal
.registerSeekCallback
in interface ConsumerSeekAware
callback
- the callback.public void onPartitionsAssigned(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback)
ConsumerSeekAware
onPartitionsAssigned
in interface ConsumerSeekAware
assignments
- the new assignments and their current offsets.callback
- the callback to perform an initial seek after assignment.public void onIdleContainer(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback)
ConsumerSeekAware
onIdleContainer
in interface ConsumerSeekAware
assignments
- the new assignments and their current offsets.callback
- the callback to perform a seek.protected org.springframework.messaging.Message<?> toMessagingMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> record, Acknowledgment acknowledgment)
protected final java.lang.Object invokeHandler(java.lang.Object data, Acknowledgment acknowledgment, org.springframework.messaging.Message<?> message)
ListenerExecutionFailedException
with a dedicated error message.data
- the data to process during invocation.acknowledgment
- the acknowledgment to use if any.message
- the message to process.protected java.lang.reflect.Type determineInferredType(java.lang.reflect.Method method)
method
- the method.