Class MessagingMessageListenerAdapter<K,V>

java.lang.Object
org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter<K,V>
Type Parameters:
K - the key type.
V - the value type.
All Implemented Interfaces:
ConsumerSeekAware
Direct Known Subclasses:
BatchMessagingMessageListenerAdapter, RecordMessagingMessageListenerAdapter

public abstract class MessagingMessageListenerAdapter<K,V> extends Object implements ConsumerSeekAware
An abstract MessageListener adapter providing the necessary infrastructure to extract the payload of a Message.
Author:
Stephane Nicoll, Gary Russell, Artem Bilan, Venil Noronha, Nathan Xu, Wang ZhiYang
  • Field Details

    • NULL_MESSAGE

      protected static final Message<KafkaNull> NULL_MESSAGE
      Message used when no conversion is needed.
    • logger

      protected final LogAccessor logger
  • Constructor Details

    • MessagingMessageListenerAdapter

      protected MessagingMessageListenerAdapter(Object bean, Method method)
      Create an instance with the provided bean and method.
      Parameters:
      bean - the bean.
      method - the method.
  • Method Details

    • setCorrelationHeaderName

      public void setCorrelationHeaderName(String correlationHeaderName)
      Set a custom header name for the correlation id. Default KafkaHeaders.CORRELATION_ID. This header will be echoed back in any reply message.
      Parameters:
      correlationHeaderName - the header name.
      Since:
      3.0
    • setMessageConverter

      public void setMessageConverter(RecordMessageConverter messageConverter)
      Set the MessageConverter.
      Parameters:
      messageConverter - the converter.
    • getMessageConverter

      protected final RecordMessageConverter getMessageConverter()
      Return the MessagingMessageConverter for this listener, being able to convert Message.
      Returns:
      the MessagingMessageConverter for this listener, being able to convert Message.
    • setMessagingConverter

      public void setMessagingConverter(SmartMessageConverter messageConverter)
      Set the SmartMessageConverter to use with the default MessagingMessageConverter. Not allowed when a custom messageConverter is provided.
      Parameters:
      messageConverter - the converter.
      Since:
      2.7.1
    • getType

      protected Type getType()
      Returns the inferred type for conversion or, if null, the fallbackType.
      Returns:
      the type.
    • setFallbackType

      public void setFallbackType(Class<?> fallbackType)
      Set a fallback type to use when using a type-aware message converter and this adapter cannot determine the inferred type from the method. An example of a type-aware message converter is the StringJsonMessageConverter. Defaults to Object.
      Parameters:
      fallbackType - the type.
    • setHandlerMethod

      public void setHandlerMethod(HandlerAdapter handlerMethod)
      Set the HandlerAdapter to use to invoke the method processing an incoming ConsumerRecord.
      Parameters:
      handlerMethod - HandlerAdapter instance.
    • isConsumerRecordList

      protected boolean isConsumerRecordList()
    • isConsumerRecords

      public boolean isConsumerRecords()
    • isConversionNeeded

      public boolean isConversionNeeded()
    • setReplyTopic

      public void setReplyTopic(String replyTopicParam)
      Set the topic to which to send any result from the method invocation. May be a SpEL expression !{...} evaluated at runtime.
      Parameters:
      replyTopicParam - the topic or expression.
      Since:
      2.0
    • setReplyTemplate

      public void setReplyTemplate(KafkaTemplate<?,?> replyTemplate)
      Set the template to use to send any result from the method invocation.
      Parameters:
      replyTemplate - the template.
      Since:
      2.0
    • setBeanResolver

      public void setBeanResolver(BeanResolver beanResolver)
      Set a bean resolver for runtime SpEL expressions. Also configures the evaluation context with a standard type converter and map accessor.
      Parameters:
      beanResolver - the resolver.
      Since:
      2.0
    • isMessageList

      protected boolean isMessageList()
    • getReplyHeadersConfigurer

      protected ReplyHeadersConfigurer getReplyHeadersConfigurer()
      Return the reply configurer.
      Returns:
      the configurer.
      Since:
      2.2
      See Also:
    • setReplyHeadersConfigurer

      public void setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigurer)
      Set a configurer which will be invoked when creating a reply message.
      Parameters:
      replyHeadersConfigurer - the configurer.
      Since:
      2.2
    • isSplitIterables

      protected boolean isSplitIterables()
      When true, Iterable return results will be split into discrete records.
      Returns:
      true to split.
      Since:
      2.3.5
    • setSplitIterables

      public void setSplitIterables(boolean splitIterables)
      Set to false to disable splitting Iterable reply values into separate records.
      Parameters:
      splitIterables - false to disable; default true.
      Since:
      2.3.5
    • registerSeekCallback

      public void registerSeekCallback(ConsumerSeekAware.ConsumerSeekCallback callback)
      Description copied from interface: ConsumerSeekAware
      Register the callback to use when seeking at some arbitrary time. When used with a ConcurrentMessageListenerContainer or the same listener instance in multiple containers listeners should store the callback in a ThreadLocal or a map keyed by the thread.
      Specified by:
      registerSeekCallback in interface ConsumerSeekAware
      Parameters:
      callback - the callback.
    • onPartitionsAssigned

      public void onPartitionsAssigned(Map<org.apache.kafka.common.TopicPartition,Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback)
      Description copied from interface: ConsumerSeekAware
      When using group management, called when partition assignments change.
      Specified by:
      onPartitionsAssigned in interface ConsumerSeekAware
      Parameters:
      assignments - the new assignments and their current offsets.
      callback - the callback to perform an initial seek after assignment.
    • onPartitionsRevoked

      public void onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> partitions)
      Description copied from interface: ConsumerSeekAware
      When using group management, called when partition assignments are revoked. Listeners should discard any callback saved from ConsumerSeekAware.registerSeekCallback(ConsumerSeekCallback) on this thread.
      Specified by:
      onPartitionsRevoked in interface ConsumerSeekAware
      Parameters:
      partitions - the partitions that have been revoked.
    • onIdleContainer

      public void onIdleContainer(Map<org.apache.kafka.common.TopicPartition,Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback)
      Description copied from interface: ConsumerSeekAware
      If the container is configured to emit idle container events, this method is called when the container idle event is emitted - allowing a seek operation.
      Specified by:
      onIdleContainer in interface ConsumerSeekAware
      Parameters:
      assignments - the new assignments and their current offsets.
      callback - the callback to perform a seek.
    • toMessagingMessage

      protected Message<?> toMessagingMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> cRecord, @Nullable Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?,?> consumer)
    • invokeHandler

      protected final Object invokeHandler(Object data, @Nullable Acknowledgment acknowledgment, Message<?> message, org.apache.kafka.clients.consumer.Consumer<?,?> consumer)
      Invoke the handler, wrapping any exception to a ListenerExecutionFailedException with a dedicated error message.
      Parameters:
      data - the data to process during invocation.
      acknowledgment - the acknowledgment to use if any.
      message - the message to process.
      consumer - the consumer.
      Returns:
      the result of invocation.
    • handleResult

      protected void handleResult(Object resultArg, Object request, Object source)
      Handle the given result object returned from the listener method, sending a response message to the SendTo topic.
      Parameters:
      resultArg - the result object to handle (never null)
      request - the original request message
      source - the source data for the method invocation - e.g. o.s.messaging.Message<?>; may be null
    • sendResponse

      protected void sendResponse(Object result, String topic, @Nullable Object source, boolean returnTypeMessage)
      Send the result to the topic.
      Parameters:
      result - the result.
      topic - the topic.
      source - the source (input).
      returnTypeMessage - true if we are returning message(s).
      Since:
      2.1.3
    • createMessagingErrorMessage

      protected final String createMessagingErrorMessage(String description, Object payload)
    • determineInferredType

      protected Type determineInferredType(Method method)
      Subclasses can override this method to use a different mechanism to determine the target type of the payload conversion.
      Parameters:
      method - the method.
      Returns:
      the type.