K
- the key type.V
- the value type.public abstract class MessagingMessageListenerAdapter<K,V> extends java.lang.Object implements ConsumerSeekAware
MessageListener
adapter
providing the necessary infrastructure to extract the payload of a
Message
.Modifier and Type | Class and Description |
---|---|
static class |
MessagingMessageListenerAdapter.ReplyExpressionRoot
Root object for reply expression evaluation.
|
ConsumerSeekAware.ConsumerSeekCallback
Modifier and Type | Field and Description |
---|---|
protected org.springframework.core.log.LogAccessor |
logger |
protected static org.springframework.messaging.Message<KafkaNull> |
NULL_MESSAGE
Message used when no conversion is needed.
|
Constructor and Description |
---|
MessagingMessageListenerAdapter(java.lang.Object bean,
java.lang.reflect.Method method) |
Modifier and Type | Method and Description |
---|---|
protected java.lang.String |
createMessagingErrorMessage(java.lang.String description,
java.lang.Object payload) |
protected java.lang.reflect.Type |
determineInferredType(java.lang.reflect.Method method)
Subclasses can override this method to use a different mechanism to determine
the target type of the payload conversion.
|
protected RecordMessageConverter |
getMessageConverter()
Return the
MessagingMessageConverter for this listener,
being able to convert Message . |
protected ReplyHeadersConfigurer |
getReplyHeadersConfigurer()
Return the reply configurer.
|
protected java.lang.reflect.Type |
getType()
Returns the inferred type for conversion or, if null, the
fallbackType . |
protected void |
handleResult(java.lang.Object resultArg,
java.lang.Object request,
java.lang.Object source)
Handle the given result object returned from the listener method, sending a
response message to the SendTo topic.
|
protected java.lang.Object |
invokeHandler(java.lang.Object data,
Acknowledgment acknowledgment,
org.springframework.messaging.Message<?> message,
org.apache.kafka.clients.consumer.Consumer<?,?> consumer)
Invoke the handler, wrapping any exception to a
ListenerExecutionFailedException
with a dedicated error message. |
protected boolean |
isConsumerRecordList() |
boolean |
isConsumerRecords() |
boolean |
isConversionNeeded() |
protected boolean |
isMessageList() |
protected boolean |
isSplitIterables()
When true,
Iterable return results will be split into discrete records. |
void |
onIdleContainer(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> assignments,
ConsumerSeekAware.ConsumerSeekCallback callback)
If the container is configured to emit idle container events, this method is called
when the container idle event is emitted - allowing a seek operation.
|
void |
onPartitionsAssigned(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> assignments,
ConsumerSeekAware.ConsumerSeekCallback callback)
When using group management, called when partition assignments change.
|
void |
onPartitionsRevoked(java.util.Collection<org.apache.kafka.common.TopicPartition> partitions)
When using group management, called when partition assignments are revoked.
|
void |
registerSeekCallback(ConsumerSeekAware.ConsumerSeekCallback callback)
Register the callback to use when seeking at some arbitrary time.
|
protected void |
sendResponse(java.lang.Object result,
java.lang.String topic,
java.lang.Object source,
boolean returnTypeMessage)
Send the result to the topic.
|
void |
setBeanResolver(org.springframework.expression.BeanResolver beanResolver)
Set a bean resolver for runtime SpEL expressions.
|
void |
setFallbackType(java.lang.Class<?> fallbackType)
Set a fallback type to use when using a type-aware message converter and this
adapter cannot determine the inferred type from the method.
|
void |
setHandlerMethod(HandlerAdapter handlerMethod)
Set the
HandlerAdapter to use to invoke the method
processing an incoming ConsumerRecord . |
void |
setMessageConverter(RecordMessageConverter messageConverter)
Set the MessageConverter.
|
void |
setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigurer)
Set a configurer which will be invoked when creating a reply message.
|
void |
setReplyTemplate(KafkaTemplate<?,?> replyTemplate)
Set the template to use to send any result from the method invocation.
|
void |
setReplyTopic(java.lang.String replyTopicParam)
Set the topic to which to send any result from the method invocation.
|
void |
setSplitIterables(boolean splitIterables)
Set to false to disable splitting
Iterable reply values into separate
records. |
protected org.springframework.messaging.Message<?> |
toMessagingMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> record,
Acknowledgment acknowledgment,
org.apache.kafka.clients.consumer.Consumer<?,?> consumer) |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
unregisterSeekCallback
protected static final org.springframework.messaging.Message<KafkaNull> NULL_MESSAGE
protected final org.springframework.core.log.LogAccessor logger
public MessagingMessageListenerAdapter(java.lang.Object bean, java.lang.reflect.Method method)
public void setMessageConverter(RecordMessageConverter messageConverter)
messageConverter
- the converter.protected final RecordMessageConverter getMessageConverter()
MessagingMessageConverter
for this listener,
being able to convert Message
.MessagingMessageConverter
for this listener,
being able to convert Message
.protected java.lang.reflect.Type getType()
fallbackType
.public void setFallbackType(java.lang.Class<?> fallbackType)
StringJsonMessageConverter
. Defaults to
Object
.fallbackType
- the type.public void setHandlerMethod(HandlerAdapter handlerMethod)
HandlerAdapter
to use to invoke the method
processing an incoming ConsumerRecord
.handlerMethod
- HandlerAdapter
instance.protected boolean isConsumerRecordList()
public boolean isConsumerRecords()
public boolean isConversionNeeded()
public void setReplyTopic(java.lang.String replyTopicParam)
!{...}
evaluated at runtime.replyTopicParam
- the topic or expression.public void setReplyTemplate(KafkaTemplate<?,?> replyTemplate)
replyTemplate
- the template.public void setBeanResolver(org.springframework.expression.BeanResolver beanResolver)
beanResolver
- the resolver.protected boolean isMessageList()
protected ReplyHeadersConfigurer getReplyHeadersConfigurer()
setReplyHeadersConfigurer(ReplyHeadersConfigurer)
public void setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigurer)
replyHeadersConfigurer
- the configurer.protected boolean isSplitIterables()
Iterable
return results will be split into discrete records.public void setSplitIterables(boolean splitIterables)
Iterable
reply values into separate
records.splitIterables
- false to disable; default true.public void registerSeekCallback(ConsumerSeekAware.ConsumerSeekCallback callback)
ConsumerSeekAware
ConcurrentMessageListenerContainer
or the same listener instance in multiple
containers listeners should store the callback in a ThreadLocal
.registerSeekCallback
in interface ConsumerSeekAware
callback
- the callback.public void onPartitionsAssigned(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback)
ConsumerSeekAware
onPartitionsAssigned
in interface ConsumerSeekAware
assignments
- the new assignments and their current offsets.callback
- the callback to perform an initial seek after assignment.public void onPartitionsRevoked(java.util.Collection<org.apache.kafka.common.TopicPartition> partitions)
ConsumerSeekAware
ConsumerSeekAware.registerSeekCallback(ConsumerSeekCallback)
on this thread.onPartitionsRevoked
in interface ConsumerSeekAware
partitions
- the partitions that have been revoked.public void onIdleContainer(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback)
ConsumerSeekAware
onIdleContainer
in interface ConsumerSeekAware
assignments
- the new assignments and their current offsets.callback
- the callback to perform a seek.protected org.springframework.messaging.Message<?> toMessagingMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> record, Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?,?> consumer)
protected final java.lang.Object invokeHandler(java.lang.Object data, Acknowledgment acknowledgment, org.springframework.messaging.Message<?> message, org.apache.kafka.clients.consumer.Consumer<?,?> consumer)
ListenerExecutionFailedException
with a dedicated error message.data
- the data to process during invocation.acknowledgment
- the acknowledgment to use if any.message
- the message to process.consumer
- the consumer.protected void handleResult(java.lang.Object resultArg, java.lang.Object request, java.lang.Object source)
resultArg
- the result object to handle (never null
)request
- the original request messagesource
- the source data for the method invocation - e.g.
o.s.messaging.Message<?>
; may be nullprotected void sendResponse(java.lang.Object result, java.lang.String topic, @Nullable java.lang.Object source, boolean returnTypeMessage)
result
- the result.topic
- the topic.source
- the source (input).returnTypeMessage
- true if we are returning message(s).protected final java.lang.String createMessagingErrorMessage(java.lang.String description, java.lang.Object payload)
protected java.lang.reflect.Type determineInferredType(java.lang.reflect.Method method)
method
- the method.