Class KafkaProducerMessageHandler<K,V>

Type Parameters:
K - the key type.
V - the value type.
All Implemented Interfaces:
org.reactivestreams.Subscriber<Message<?>>, Aware, BeanClassLoaderAware, BeanFactoryAware, BeanNameAware, DisposableBean, InitializingBean, ApplicationContextAware, Lifecycle, Ordered, ExpressionCapable, Orderable, MessageProducer, HeaderPropagationAware, IntegrationPattern, NamedComponent, IntegrationManagement, ManageableLifecycle, TrackableComponent, MessageHandler, reactor.core.CoreSubscriber<Message<?>>

public class KafkaProducerMessageHandler<K,V> extends AbstractReplyProducingMessageHandler implements ManageableLifecycle
A Message Handler for Apache Kafka; when supplied with a ReplyingKafkaTemplate it is used as the handler in an outbound gateway. When supplied with a simple KafkaTemplate it used as the handler in an outbound channel adapter.

The handler also supports receiving a pre-built ProducerRecord payload. In that case, most configuration properties (setTopicExpression(Expression) etc.) are ignored. If the handler is used as gateway, the ProducerRecord will have its headers enhanced to add the KafkaHeaders.REPLY_TOPIC unless it already contains such a header. The handler will not map any additional headers; providing such a payload assumes the headers have already been mapped.

Since:
5.4
Author:
Soby Chacko, Artem Bilan, Gary Russell, Marius Bogoevici, Biju Kunjummen, Tom van den Berge
  • Constructor Details

    • KafkaProducerMessageHandler

      public KafkaProducerMessageHandler(org.springframework.kafka.core.KafkaTemplate<K,V> kafkaTemplate)
  • Method Details

    • setTopicExpression

      public void setTopicExpression(Expression topicExpression)
    • setMessageKeyExpression

      public void setMessageKeyExpression(Expression messageKeyExpression)
    • setPartitionIdExpression

      public void setPartitionIdExpression(Expression partitionIdExpression)
    • setTimestampExpression

      public void setTimestampExpression(Expression timestampExpression)
      Specify a SpEL expression to evaluate a timestamp that will be added in the Kafka record. The resulting value should be a Long type representing epoch time in milliseconds.
      Parameters:
      timestampExpression - the Expression for timestamp to wait for result fo send operation.
    • setFlushExpression

      public void setFlushExpression(Expression flushExpression)
      Specify a SpEL expression that evaluates to a Boolean to determine whether the producer should be flushed after the send. Defaults to looking for a Boolean value in a KafkaIntegrationHeaders.FLUSH header; false if absent.
      Parameters:
      flushExpression - the Expression.
    • setHeaderMapper

      public void setHeaderMapper(org.springframework.kafka.support.KafkaHeaderMapper headerMapper)
      Set the header mapper to use.
      Parameters:
      headerMapper - the mapper; can be null to disable header mapping.
    • getHeaderMapper

      public org.springframework.kafka.support.KafkaHeaderMapper getHeaderMapper()
    • getKafkaTemplate

      public org.springframework.kafka.core.KafkaTemplate<?,?> getKafkaTemplate()
    • setSync

      public void setSync(boolean sync)
      A boolean indicating if the KafkaProducerMessageHandler should wait for the send operation results or not. Defaults to false. In sync mode a downstream send operation exception will be re-thrown.
      Parameters:
      sync - the send mode; async by default.
    • setSendTimeout

      public final void setSendTimeout(long sendTimeout)
      Specify a timeout in milliseconds for how long this KafkaProducerMessageHandler should wait for send operation results. Defaults to the kafka delivery.timeout.ms property + 5 seconds. The timeout is applied Also applies when sending to the success or failure channels.
      Overrides:
      setSendTimeout in class AbstractMessageProducingHandler
      Parameters:
      sendTimeout - the timeout to wait for result for a send operation.
    • setSendTimeoutExpression

      public void setSendTimeoutExpression(Expression sendTimeoutExpression)
      Specify a SpEL expression to evaluate a timeout in milliseconds for how long this KafkaProducerMessageHandler should wait for send operation results. Defaults to the kafka delivery.timeout.ms property + 5 seconds. The timeout is applied only in sync mode. If this expression yields a result that is less than that value, the higher value is used.
      Parameters:
      sendTimeoutExpression - the Expression for timeout to wait for result for a send operation.
      See Also:
    • setSendFailureChannel

      public void setSendFailureChannel(MessageChannel sendFailureChannel)
      Set the failure channel. After a send failure, an ErrorMessage will be sent to this channel with a payload of a KafkaSendFailureException with the failed message and cause.
      Parameters:
      sendFailureChannel - the failure channel.
    • setSendFailureChannelName

      public void setSendFailureChannelName(String sendFailureChannelName)
      Set the failure channel name. After a send failure, an ErrorMessage will be sent to this channel name with a payload of a KafkaSendFailureException with the failed message and cause.
      Parameters:
      sendFailureChannelName - the failure channel name.
    • setSendSuccessChannel

      public void setSendSuccessChannel(MessageChannel sendSuccessChannel)
      Set the success channel.
      Parameters:
      sendSuccessChannel - the Success channel.
    • setSendSuccessChannelName

      public void setSendSuccessChannelName(String sendSuccessChannelName)
      Set the Success channel name.
      Parameters:
      sendSuccessChannelName - the Success channel name.
    • setFuturesChannel

      public void setFuturesChannel(MessageChannel futuresChannel)
      Set the futures channel.
      Parameters:
      futuresChannel - the futures channel.
    • setFuturesChannelName

      public void setFuturesChannelName(String futuresChannelName)
      Set the futures channel name.
      Parameters:
      futuresChannelName - the futures channel name.
    • setErrorMessageStrategy

      public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy)
      Set the error message strategy implementation to use when sending error messages after send failures. Cannot be null.
      Parameters:
      errorMessageStrategy - the implementation.
    • setReplyMessageConverter

      public void setReplyMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
      Set a message converter for gateway replies.
      Parameters:
      messageConverter - the converter.
      See Also:
    • setReplyPayloadType

      public void setReplyPayloadType(Type payloadType)
      When using a type-aware message converter (such as StringJsonMessageConverter, set the payload type the converter should create. Defaults to Object.
      Parameters:
      payloadType - the type.
      See Also:
    • setProducerRecordCreator

      public void setProducerRecordCreator(KafkaProducerMessageHandler.ProducerRecordCreator<K,V> producerRecordCreator)
      Set a KafkaProducerMessageHandler.ProducerRecordCreator to create the ProducerRecord. Ignored if useTemplateConverter is true.
      Parameters:
      producerRecordCreator - the creator.
      See Also:
    • setTimeoutBuffer

      public void setTimeoutBuffer(int timeoutBuffer)
      Set a buffer, in milliseconds, added to the configured delivery.timeout.ms to determine the minimum time to wait for the send future completion when sync is true.
      Parameters:
      timeoutBuffer - the buffer.
      See Also:
    • setUseTemplateConverter

      public void setUseTemplateConverter(boolean useTemplateConverter)
      Set to true to use the template's message converter to create the ProducerRecord instead of the producerRecordCreator.
      Parameters:
      useTemplateConverter - true to use the converter.
      Since:
      5.5.5
      See Also:
    • getComponentType

      public String getComponentType()
      Description copied from class: IntegrationObjectSupport
      Subclasses may implement this method to provide component type information.
      Specified by:
      getComponentType in interface NamedComponent
      Overrides:
      getComponentType in class MessageHandlerSupport
    • getSendFailureChannel

      protected MessageChannel getSendFailureChannel()
    • getSendSuccessChannel

      protected MessageChannel getSendSuccessChannel()
    • getFuturesChannel

      protected MessageChannel getFuturesChannel()
    • doInit

      protected void doInit()
      Overrides:
      doInit in class AbstractReplyProducingMessageHandler
    • start

      public void start()
      Specified by:
      start in interface Lifecycle
      Specified by:
      start in interface ManageableLifecycle
    • stop

      public void stop()
      Specified by:
      stop in interface Lifecycle
      Specified by:
      stop in interface ManageableLifecycle
    • isRunning

      public boolean isRunning()
      Specified by:
      isRunning in interface Lifecycle
      Specified by:
      isRunning in interface ManageableLifecycle
    • handleRequestMessage

      protected Object handleRequestMessage(Message<?> message)
      Description copied from class: AbstractReplyProducingMessageHandler
      Subclasses must implement this method to handle the request Message. The return value may be a Message, a MessageBuilder, or any plain Object. The base class will handle the final creation of a reply Message from any of those starting points. If the return value is null, the Message flow will end here.
      Specified by:
      handleRequestMessage in class AbstractReplyProducingMessageHandler
      Parameters:
      message - The request message.
      Returns:
      The result of handling the message, or null.
    • processSendResult

      public void processSendResult(Message<?> message, org.apache.kafka.clients.producer.ProducerRecord<K,V> producerRecord, ListenableFuture<org.springframework.kafka.support.SendResult<K,V>> future, MessageChannel metadataChannel) throws InterruptedException, ExecutionException
      Throws:
      InterruptedException
      ExecutionException