Class KafkaProducerMessageHandlerSpec<K,V,S extends KafkaProducerMessageHandlerSpec<K,V,S>>

Type Parameters:
K - the key type.
V - the value type.
S - the KafkaProducerMessageHandlerSpec extension type.
All Implemented Interfaces:
DisposableBean, FactoryBean<KafkaProducerMessageHandler<K,V>>, InitializingBean, Lifecycle, Phased, SmartLifecycle
Direct Known Subclasses:
KafkaOutboundGatewaySpec, KafkaProducerMessageHandlerSpec.KafkaProducerMessageHandlerTemplateSpec

public class KafkaProducerMessageHandlerSpec<K,V,S extends KafkaProducerMessageHandlerSpec<K,V,S>> extends MessageHandlerSpec<S,KafkaProducerMessageHandler<K,V>>
Since:
5.4
Author:
Artem Bilan, Biju Kunjummen, Gary Russell
  • Method Details

    • topic

      public S topic(String topic)
      Configure the Kafka topic to send messages.
      Parameters:
      topic - the Kafka topic name.
      Returns:
      the spec.
    • topicExpression

      public S topicExpression(String topicExpression)
      Configure a SpEL expression to determine the Kafka topic at runtime against request Message as a root object of evaluation context.
      Parameters:
      topicExpression - the topic SpEL expression.
      Returns:
      the spec.
    • topicExpression

      public S topicExpression(Expression topicExpression)
      Configure an Expression to determine the Kafka topic at runtime against request Message as a root object of evaluation context.
      Parameters:
      topicExpression - the topic expression.
      Returns:
      the spec.
    • topic

      public <P> S topic(Function<Message<P>,String> topicFunction)
      Configure a Function that will be invoked at runtime to determine the topic to which a message will be sent. Typically used with a Java 8 Lambda expression:
       
       .<Foo>topic(m -> m.getPayload().getTopic())
       
       
      Type Parameters:
      P - the expected payload type.
      Parameters:
      topicFunction - the topic function.
      Returns:
      the current KafkaProducerMessageHandlerSpec.
      See Also:
    • messageKeyExpression

      public S messageKeyExpression(String messageKeyExpression)
      Configure a SpEL expression to determine the Kafka message key to store at runtime against request Message as a root object of evaluation context.
      Parameters:
      messageKeyExpression - the message key SpEL expression.
      Returns:
      the spec.
    • messageKey

      public S messageKey(String messageKey)
      Configure the message key to store message in Kafka topic.
      Parameters:
      messageKey - the message key to use.
      Returns:
      the spec.
    • messageKeyExpression

      public S messageKeyExpression(Expression messageKeyExpression)
      Configure an Expression to determine the Kafka message key to store at runtime against request Message as a root object of evaluation context.
      Parameters:
      messageKeyExpression - the message key expression.
      Returns:
      the spec.
    • messageKey

      public <P> S messageKey(Function<Message<P>,?> messageKeyFunction)
      Configure a Function that will be invoked at runtime to determine the message key under which a message will be stored in the topic. Typically used with a Java 8 Lambda expression:
       
       .<Foo>messageKey(m -> m.getPayload().getKey())
       
       
      Type Parameters:
      P - the expected payload type.
      Parameters:
      messageKeyFunction - the message key function.
      Returns:
      the current KafkaProducerMessageHandlerSpec.
      See Also:
    • partitionId

      public S partitionId(Integer partitionId)
      Configure a partitionId of Kafka topic.
      Parameters:
      partitionId - the partitionId to use.
      Returns:
      the spec.
    • partitionIdExpression

      public S partitionIdExpression(String partitionIdExpression)
      Configure a SpEL expression to determine the topic partitionId at runtime against request Message as a root object of evaluation context.
      Parameters:
      partitionIdExpression - the partitionId expression to use.
      Returns:
      the spec.
    • partitionId

      public <P> S partitionId(Function<Message<P>,Integer> partitionIdFunction)
      Configure a Function that will be invoked at runtime to determine the partition id under which a message will be stored in the topic. Typically used with a Java 8 Lambda expression:
       
       .partitionId(m -> m.getHeaders().get("partitionId", Integer.class))
       
       
      Type Parameters:
      P - the expected payload type.
      Parameters:
      partitionIdFunction - the partitionId function.
      Returns:
      the spec.
    • partitionIdExpression

      public S partitionIdExpression(Expression partitionIdExpression)
      Configure an Expression to determine the topic partitionId at runtime against request Message as a root object of evaluation context.
      Parameters:
      partitionIdExpression - the partitionId expression to use.
      Returns:
      the spec.
    • timestampExpression

      public S timestampExpression(String timestampExpression)
      Configure a SpEL expression to determine the timestamp at runtime against a request Message as a root object of evaluation context.
      Parameters:
      timestampExpression - the timestamp expression to use.
      Returns:
      the spec.
    • timestamp

      public <P> S timestamp(Function<Message<P>,Long> timestampFunction)
      Configure a Function that will be invoked at runtime to determine the Kafka record timestamp will be stored in the topic. Typically used with a Java 8 Lambda expression:
       
       .timestamp(m -> m.getHeaders().get("mytimestamp_header", Long.class))
       
       
      Type Parameters:
      P - the expected payload type.
      Parameters:
      timestampFunction - the timestamp function.
      Returns:
      the spec.
    • timestampExpression

      public S timestampExpression(Expression timestampExpression)
      Configure an Expression to determine the timestamp at runtime against a request Message as a root object of evaluation context.
      Parameters:
      timestampExpression - the timestamp expression to use.
      Returns:
      the spec.
    • flushExpression

      public S flushExpression(String flushExpression)
      Configure a SpEL expression to determine whether to flush the producer after a send. By default, the producer is flushed if a header kafka_flush has a value Boolean.TRUE.
      Parameters:
      flushExpression - the timestamp expression to use.
      Returns:
      the spec.
    • flush

      public <P> S flush(Function<Message<P>,Boolean> flushFunction)
      Configure a Function that will be invoked at runtime to determine whether to flush the producer after send. By default, the producer is flushed if a header kafka_flush has a value Boolean.TRUE. Typically, used with a Java Lambda expression:
       
       .flush(m -> m.getPayload().shouldFlush())
       
       
      Type Parameters:
      P - the expected payload type.
      Parameters:
      flushFunction - the flush function.
      Returns:
      the spec.
    • flushExpression

      public S flushExpression(Expression flushExpression)
      Configure an Expression to determine whether to flush the producer after a send. By default, the producer is flushed if a header kafka_flush has a value Boolean.TRUE.
      Parameters:
      flushExpression - the timestamp expression to use.
      Returns:
      the spec.
    • sync

      public S sync(boolean sync)
      A boolean indicating if the KafkaProducerMessageHandler should wait for the send operation results or not. Defaults to false. In sync mode a downstream send operation exception will be re-thrown.
      Parameters:
      sync - the send mode; async by default.
      Returns:
      the spec.
    • sendTimeout

      public S sendTimeout(long sendTimeout)
      Specify a timeout in milliseconds how long KafkaProducerMessageHandler should wait for send operation results. Defaults to 10 seconds.
      Parameters:
      sendTimeout - the timeout to wait for result fo send operation.
      Returns:
      the spec.
    • headerMapper

      public S headerMapper(org.springframework.kafka.support.KafkaHeaderMapper mapper)
      Specify a header mapper to map spring messaging headers to Kafka headers.
      Parameters:
      mapper - the mapper.
      Returns:
      the spec.
    • sendSuccessChannel

      public S sendSuccessChannel(MessageChannel sendSuccessChannel)
      Set the channel to which successful send results are sent.
      Parameters:
      sendSuccessChannel - the channel.
      Returns:
      the spec.
    • sendSuccessChannel

      public S sendSuccessChannel(String sendSuccessChannel)
      Set the channel to which successful send results are sent.
      Parameters:
      sendSuccessChannel - the channel name.
      Returns:
      the spec.
    • sendFailureChannel

      public S sendFailureChannel(MessageChannel sendFailureChannel)
      Set the channel to which failed send results are sent.
      Parameters:
      sendFailureChannel - the channel.
      Returns:
      the spec.
    • sendFailureChannel

      public S sendFailureChannel(String sendFailureChannel)
      Set the channel to which failed send results are sent.
      Parameters:
      sendFailureChannel - the channel name.
      Returns:
      the spec.
    • futuresChannel

      public S futuresChannel(MessageChannel futuresChannel)
      Set the channel to which send futures are sent.
      Parameters:
      futuresChannel - the channel.
      Returns:
      the spec.
    • futuresChannel

      public S futuresChannel(String futuresChannel)
      Set the channel to which send futures are sent.
      Parameters:
      futuresChannel - the channel name.
      Returns:
      the spec.
    • producerRecordCreator

      public S producerRecordCreator(KafkaProducerMessageHandler.ProducerRecordCreator<K,V> creator)
      Set a KafkaProducerMessageHandler.ProducerRecordCreator to create the ProducerRecord. Ignored if useTemplateConverter is true.
      Parameters:
      creator - the creator.
      Returns:
      the spec.
      Since:
      5.5.5
    • useTemplateConverter

      public S useTemplateConverter(boolean use)
      Set to true to use the template's message converter to create the ProducerRecord instead of the producerRecordCreator.
      Parameters:
      use - true to use the converter.
      Returns:
      the spec.
      Since:
      5.5.5