Class KafkaInboundGateway<K,​V,​R>

Type Parameters:
K - the key type.
V - the request value type.
R - the reply value type.
All Implemented Interfaces:
Aware, BeanFactoryAware, BeanNameAware, DisposableBean, InitializingBean, ApplicationContextAware, Lifecycle, Phased, SmartLifecycle, ExpressionCapable, OrderlyShutdownCapable, Pausable, IntegrationPattern, NamedComponent, IntegrationInboundManagement, IntegrationManagement, ManageableLifecycle, ManageableSmartLifecycle, TrackableComponent

public class KafkaInboundGateway<K,​V,​R>
extends MessagingGatewaySupport
implements Pausable, OrderlyShutdownCapable
Inbound gateway.
Since:
5.4
Author:
Gary Russell, Artem Bilan, Urs Keller
  • Constructor Details

    • KafkaInboundGateway

      public KafkaInboundGateway​(org.springframework.kafka.listener.AbstractMessageListenerContainer<K,​V> messageListenerContainer, org.springframework.kafka.core.KafkaTemplate<K,​R> kafkaTemplate)
      Construct an instance with the provided container.
      Parameters:
      messageListenerContainer - the container.
      kafkaTemplate - the kafka template.
  • Method Details

    • setMessageConverter

      public void setMessageConverter​(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
      Set the message converter; must be a RecordMessageConverter or BatchMessageConverter depending on mode.
      Parameters:
      messageConverter - the converter.
    • setPayloadType

      public void setPayloadType​(Class<?> payloadType)
      When using a type-aware message converter such as StringJsonMessageConverter, set the payload type the converter should create. Defaults to Object.
      Parameters:
      payloadType - the type.
    • setRetryTemplate

      public void setRetryTemplate​(org.springframework.retry.support.RetryTemplate retryTemplate)
      Specify a RetryTemplate instance to wrap KafkaInboundGateway.IntegrationRecordMessageListener into RetryingMessageListenerAdapter.
      Parameters:
      retryTemplate - the RetryTemplate to use.
    • setRecoveryCallback

      public void setRecoveryCallback​(org.springframework.retry.RecoveryCallback<?> recoveryCallback)
      A RecoveryCallback instance for retry operation; if null, the exception will be thrown to the container after retries are exhausted (unless an error channel is configured). Does not make sense if setRetryTemplate(RetryTemplate) isn't specified.
      Parameters:
      recoveryCallback - the recovery callback.
    • setOnPartitionsAssignedSeekCallback

      public void setOnPartitionsAssignedSeekCallback​(BiConsumer<Map<org.apache.kafka.common.TopicPartition,​Long>,​org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback)
      Specify a BiConsumer for seeks management during ConsumerSeekAware.onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback) call from the KafkaMessageListenerContainer. This is called from the internal MessagingMessageListenerAdapter implementation.
      Parameters:
      onPartitionsAssignedCallback - the BiConsumer to use
      See Also:
      ConsumerSeekAware.onPartitionsAssigned(java.util.Map<org.apache.kafka.common.TopicPartition, java.lang.Long>, org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback)
    • setBindSourceRecord

      public void setBindSourceRecord​(boolean bindSourceRecord)
      Set to true to bind the source consumer record in the header named IntegrationMessageHeaderAccessor.SOURCE_DATA.
      Parameters:
      bindSourceRecord - true to bind.
    • onInit

      protected void onInit()
      Description copied from class: IntegrationObjectSupport
      Subclasses may implement this for initialization logic.
      Overrides:
      onInit in class MessagingGatewaySupport
    • doStart

      protected void doStart()
      Description copied from class: AbstractEndpoint
      Subclasses must implement this method with the start behavior. This method will be invoked while holding the AbstractEndpoint.lifecycleLock.
      Overrides:
      doStart in class MessagingGatewaySupport
    • doStop

      protected void doStop()
      Description copied from class: AbstractEndpoint
      Subclasses must implement this method with the stop behavior. This method will be invoked while holding the AbstractEndpoint.lifecycleLock.
      Overrides:
      doStop in class MessagingGatewaySupport
    • pause

      public void pause()
      Description copied from interface: Pausable
      Pause the endpoint.
      Specified by:
      pause in interface Pausable
    • resume

      public void resume()
      Description copied from interface: Pausable
      Resume the endpoint if paused.
      Specified by:
      resume in interface Pausable
    • isPaused

      public boolean isPaused()
      Description copied from interface: Pausable
      Check if the endpoint is paused.
      Specified by:
      isPaused in interface Pausable
      Returns:
      true if paused.
    • getComponentType

      public String getComponentType()
      Description copied from class: IntegrationObjectSupport
      Subclasses may implement this method to provide component type information.
      Specified by:
      getComponentType in interface NamedComponent
      Overrides:
      getComponentType in class MessagingGatewaySupport
    • beforeShutdown

      public int beforeShutdown()
      Description copied from interface: OrderlyShutdownCapable
      Called before shutdown begins. Implementations should stop accepting new messages. Can optionally return the number of active messages in process.
      Specified by:
      beforeShutdown in interface OrderlyShutdownCapable
      Returns:
      The number of active messages if available.
    • afterShutdown

      public int afterShutdown()
      Description copied from interface: OrderlyShutdownCapable
      Called after normal shutdown of schedulers, executors etc, and after the shutdown delay has elapsed, but before any forced shutdown of any remaining active scheduler/executor threads.Can optionally return the number of active messages still in process.
      Specified by:
      afterShutdown in interface OrderlyShutdownCapable
      Returns:
      The number of active messages if available.
    • getErrorMessageAttributes

      protected AttributeAccessor getErrorMessageAttributes​(Message<?> message)
      Description copied from class: MessagingGatewaySupport
      Populate an AttributeAccessor to be used when building an error message with the errorMessageStrategy.
      Overrides:
      getErrorMessageAttributes in class MessagingGatewaySupport
      Parameters:
      message - the message.
      Returns:
      the attributes.