Class KafkaMessageDrivenChannelAdapter<K,V>

Type Parameters:
K - the key type.
V - the value type.
All Implemented Interfaces:
Aware, BeanFactoryAware, BeanNameAware, DisposableBean, InitializingBean, SmartInitializingSingleton, ApplicationContextAware, Lifecycle, Phased, SmartLifecycle, ExpressionCapable, OrderlyShutdownCapable, MessageProducer, Pausable, IntegrationPattern, KafkaInboundEndpoint, NamedComponent, ManageableLifecycle, ManageableSmartLifecycle, TrackableComponent

public class KafkaMessageDrivenChannelAdapter<K,V> extends MessageProducerSupport implements KafkaInboundEndpoint, OrderlyShutdownCapable, Pausable
Message-driven channel adapter.
Since:
5.4
Author:
Marius Bogoevici, Gary Russell, Artem Bilan, Urs Keller
  • Constructor Details

    • KafkaMessageDrivenChannelAdapter

      public KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V> messageListenerContainer)
      Parameters:
      messageListenerContainer - the container.
    • KafkaMessageDrivenChannelAdapter

      public KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V> messageListenerContainer, KafkaMessageDrivenChannelAdapter.ListenerMode mode)
      Construct an instance with the provided mode.
      Parameters:
      messageListenerContainer - the container.
      mode - the mode.
  • Method Details

    • setMessageConverter

      public void setMessageConverter(org.springframework.kafka.support.converter.MessageConverter messageConverter)
      Set the message converter; must be a RecordMessageConverter or BatchMessageConverter depending on mode.
      Parameters:
      messageConverter - the converter.
    • setRecordMessageConverter

      public void setRecordMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
      Set the message converter to use with a record-based consumer.
      Parameters:
      messageConverter - the converter.
    • setBatchMessageConverter

      public void setBatchMessageConverter(org.springframework.kafka.support.converter.BatchMessageConverter messageConverter)
      Set the message converter to use with a batch-based consumer.
      Parameters:
      messageConverter - the converter.
    • setRecordFilterStrategy

      public void setRecordFilterStrategy(org.springframework.kafka.listener.adapter.RecordFilterStrategy<K,V> recordFilterStrategy)
      Specify a RecordFilterStrategy to wrap KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener into FilteringMessageListenerAdapter.
      Parameters:
      recordFilterStrategy - the RecordFilterStrategy to use.
    • setAckDiscarded

      public void setAckDiscarded(boolean ackDiscarded)
      A boolean flag to indicate if FilteringMessageListenerAdapter should acknowledge discarded records or not. Does not make sense if setRecordFilterStrategy(RecordFilterStrategy) isn't specified.
      Parameters:
      ackDiscarded - true to ack (commit offset for) discarded messages.
    • setRetryTemplate

      public void setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate)
      Specify a RetryTemplate instance to wrap KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener into RetryingMessageListenerAdapter.

      IMPORTANT: This form of retry is blocking and could cause a rebalance if the aggregate retry delays across all polled records might exceed the max.poll.interval.ms. Instead, consider adding a DefaultErrorHandler to the listener container, configured with a KafkaErrorSendingMessageRecoverer.

      Parameters:
      retryTemplate - the RetryTemplate to use.
    • setRecoveryCallback

      public void setRecoveryCallback(org.springframework.retry.RecoveryCallback<?> recoveryCallback)
      A RecoveryCallback instance for retry operation; if null, the exception will be thrown to the container after retries are exhausted (unless an error channel is configured). Only used if a setRetryTemplate(RetryTemplate) is specified. Default is an ErrorMessageSendingRecoverer if an error channel has been provided. Set to null if you wish to throw the exception back to the container after retries are exhausted.
      Parameters:
      recoveryCallback - the recovery callback.
    • setFilterInRetry

      public void setFilterInRetry(boolean filterInRetry)
      The boolean flag to specify the order how RetryingMessageListenerAdapter and FilteringMessageListenerAdapter are wrapped to each other, if both of them are present. Does not make sense if only one of RetryTemplate or RecordFilterStrategy is present, or any. When true, the filter is called for each retry; when false, the filter is only called once for each delivery from the container.
      Parameters:
      filterInRetry - the order for RetryingMessageListenerAdapter and FilteringMessageListenerAdapter wrapping. Defaults to false.
    • setPayloadType

      public void setPayloadType(Class<?> payloadType)
      When using a type-aware message converter such as StringJsonMessageConverter, set the payload type the converter should create. Defaults to Object.
      Parameters:
      payloadType - the type.
    • setOnPartitionsAssignedSeekCallback

      public void setOnPartitionsAssignedSeekCallback(BiConsumer<Map<org.apache.kafka.common.TopicPartition,Long>,org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback)
      Specify a BiConsumer for seeks management during ConsumerSeekAware.onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback) call from the KafkaMessageListenerContainer. This is called from the internal MessagingMessageListenerAdapter implementation.
      Parameters:
      onPartitionsAssignedCallback - the BiConsumer to use
      See Also:
      • ConsumerSeekAware.onPartitionsAssigned(java.util.Map<org.apache.kafka.common.TopicPartition, java.lang.Long>, org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback)
    • setBindSourceRecord

      public void setBindSourceRecord(boolean bindSourceRecord)
      Set to true to bind the source consumer record in the header named IntegrationMessageHeaderAccessor.SOURCE_DATA. Does not apply to batch listeners.
      Parameters:
      bindSourceRecord - true to bind.
    • getComponentType

      public String getComponentType()
      Description copied from class: IntegrationObjectSupport
      Subclasses may implement this method to provide component type information.
      Specified by:
      getComponentType in interface NamedComponent
      Overrides:
      getComponentType in class IntegrationObjectSupport
    • onInit

      protected void onInit()
      Description copied from class: IntegrationObjectSupport
      Subclasses may implement this for initialization logic.
      Overrides:
      onInit in class MessageProducerSupport
    • doStart

      protected void doStart()
      Description copied from class: MessageProducerSupport
      Take no action by default. Subclasses may override this if they need lifecycle-managed behavior. Protected by 'lifecycleLock'.
      Overrides:
      doStart in class MessageProducerSupport
    • doStop

      protected void doStop()
      Description copied from class: MessageProducerSupport
      Take no action by default. Subclasses may override this if they need lifecycle-managed behavior.
      Overrides:
      doStop in class MessageProducerSupport
    • pause

      public void pause()
      Description copied from interface: Pausable
      Pause the endpoint.
      Specified by:
      pause in interface Pausable
    • resume

      public void resume()
      Description copied from interface: Pausable
      Resume the endpoint if paused.
      Specified by:
      resume in interface Pausable
    • isPaused

      public boolean isPaused()
      Description copied from interface: Pausable
      Check if the endpoint is paused.
      Specified by:
      isPaused in interface Pausable
      Returns:
      true if paused.
    • beforeShutdown

      public int beforeShutdown()
      Description copied from interface: OrderlyShutdownCapable
      Called before shutdown begins. Implementations should stop accepting new messages. Can optionally return the number of active messages in process.
      Specified by:
      beforeShutdown in interface OrderlyShutdownCapable
      Returns:
      The number of active messages if available.
    • afterShutdown

      public int afterShutdown()
      Description copied from interface: OrderlyShutdownCapable
      Called after normal shutdown of schedulers, executors etc, and after the shutdown delay has elapsed, but before any forced shutdown of any remaining active scheduler/executor threads.Can optionally return the number of active messages still in process.
      Specified by:
      afterShutdown in interface OrderlyShutdownCapable
      Returns:
      The number of active messages if available.
    • getErrorMessageAttributes

      protected AttributeAccessor getErrorMessageAttributes(Message<?> message)
      Description copied from class: MessageProducerSupport
      Populate an AttributeAccessor to be used when building an error message with the errorMessageStrategy.
      Overrides:
      getErrorMessageAttributes in class MessageProducerSupport
      Parameters:
      message - the message.
      Returns:
      the attributes.