Class KafkaInboundGateway<K,V,R>

Type Parameters:
K - the key type.
V - the request value type.
R - the reply value type.
All Implemented Interfaces:
Aware, BeanFactoryAware, BeanNameAware, DisposableBean, InitializingBean, ApplicationContextAware, Lifecycle, Phased, SmartLifecycle, ComponentSourceAware, ExpressionCapable, OrderlyShutdownCapable, Pausable, IntegrationPattern, KafkaInboundEndpoint, NamedComponent, IntegrationInboundManagement, IntegrationManagement, ManageableLifecycle, ManageableSmartLifecycle, TrackableComponent

public class KafkaInboundGateway<K,V,R> extends MessagingGatewaySupport implements KafkaInboundEndpoint, Pausable, OrderlyShutdownCapable
Inbound gateway.
Since:
5.4
Author:
Gary Russell, Artem Bilan, Urs Keller
  • Constructor Details

    • KafkaInboundGateway

      public KafkaInboundGateway(org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V> messageListenerContainer, org.springframework.kafka.core.KafkaTemplate<K,R> kafkaTemplate)
      Construct an instance with the provided container.
      Parameters:
      messageListenerContainer - the container.
      kafkaTemplate - the kafka template.
  • Method Details

    • setMessageConverter

      public void setMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
      Set the message converter; must be a RecordMessageConverter or BatchMessageConverter depending on mode.
      Parameters:
      messageConverter - the converter.
    • setPayloadType

      public void setPayloadType(Class<?> payloadType)
      When using a type-aware message converter such as StringJsonMessageConverter, set the payload type the converter should create. Defaults to Object.
      Parameters:
      payloadType - the type.
    • setRetryTemplate

      public void setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate)
      Specify a RetryTemplate instance to use for retrying deliveries.

      IMPORTANT: This form of retry is blocking and could cause a rebalance if the aggregate retry delays across all polled records might exceed the max.poll.interval.ms. Instead, consider adding a DefaultErrorHandler to the listener container, configured with a KafkaErrorSendingMessageRecoverer.

      Parameters:
      retryTemplate - the RetryTemplate to use.
    • setRecoveryCallback

      public void setRecoveryCallback(org.springframework.retry.RecoveryCallback<?> recoveryCallback)
      A RecoveryCallback instance for retry operation; if null, the exception will be thrown to the container after retries are exhausted (unless an error channel is configured). Only used if setRetryTemplate(RetryTemplate) is specified. Default is an ErrorMessageSendingRecoverer if an error channel has been provided. Set to null if you wish to throw the exception back to the container after retries are exhausted.
      Parameters:
      recoveryCallback - the recovery callback.
    • setOnPartitionsAssignedSeekCallback

      public void setOnPartitionsAssignedSeekCallback(BiConsumer<Map<org.apache.kafka.common.TopicPartition,Long>,org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback)
      Specify a BiConsumer for seeks management during ConsumerSeekAware.onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback) call from the KafkaMessageListenerContainer. This is called from the internal MessagingMessageListenerAdapter implementation.
      Parameters:
      onPartitionsAssignedCallback - the BiConsumer to use
      See Also:
      • ConsumerSeekAware.onPartitionsAssigned(java.util.Map<org.apache.kafka.common.TopicPartition, java.lang.Long>, org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback)
    • setBindSourceRecord

      public void setBindSourceRecord(boolean bindSourceRecord)
      Set to true to bind the source consumer record in the header named IntegrationMessageHeaderAccessor.SOURCE_DATA.
      Parameters:
      bindSourceRecord - true to bind.
    • onInit

      protected void onInit()
      Description copied from class: IntegrationObjectSupport
      Subclasses may implement this for initialization logic.
      Overrides:
      onInit in class MessagingGatewaySupport
    • doStart

      protected void doStart()
      Description copied from class: AbstractEndpoint
      Subclasses must implement this method with the start behavior. This method will be invoked while holding the AbstractEndpoint.lifecycleLock.
      Overrides:
      doStart in class MessagingGatewaySupport
    • doStop

      protected void doStop()
      Description copied from class: AbstractEndpoint
      Subclasses must implement this method with the stop behavior. This method will be invoked while holding the AbstractEndpoint.lifecycleLock.
      Overrides:
      doStop in class MessagingGatewaySupport
    • pause

      public void pause()
      Description copied from interface: Pausable
      Pause the endpoint.
      Specified by:
      pause in interface Pausable
    • resume

      public void resume()
      Description copied from interface: Pausable
      Resume the endpoint if paused.
      Specified by:
      resume in interface Pausable
    • isPaused

      public boolean isPaused()
      Description copied from interface: Pausable
      Check if the endpoint is paused.
      Specified by:
      isPaused in interface Pausable
      Returns:
      true if paused.
    • getComponentType

      public String getComponentType()
      Description copied from class: IntegrationObjectSupport
      Subclasses may implement this method to provide component type information.
      Specified by:
      getComponentType in interface NamedComponent
      Overrides:
      getComponentType in class MessagingGatewaySupport
    • beforeShutdown

      public int beforeShutdown()
      Description copied from interface: OrderlyShutdownCapable
      Called before shutdown begins. Implementations should stop accepting new messages. Can optionally return the number of active messages in process.
      Specified by:
      beforeShutdown in interface OrderlyShutdownCapable
      Returns:
      The number of active messages if available.
    • afterShutdown

      public int afterShutdown()
      Description copied from interface: OrderlyShutdownCapable
      Called after normal shutdown of schedulers, executors etc, and after the shutdown delay has elapsed, but before any forced shutdown of any remaining active scheduler/executor threads.Can optionally return the number of active messages still in process.
      Specified by:
      afterShutdown in interface OrderlyShutdownCapable
      Returns:
      The number of active messages if available.
    • getErrorMessageAttributes

      protected AttributeAccessor getErrorMessageAttributes(Message<?> message)
      Description copied from class: MessagingGatewaySupport
      Populate an AttributeAccessor to be used when building an error message with the errorMessageStrategy.
      Overrides:
      getErrorMessageAttributes in class MessagingGatewaySupport
      Parameters:
      message - the message.
      Returns:
      the attributes.