Class KafkaProducerMessageHandlerSpec<K,V,S extends KafkaProducerMessageHandlerSpec<K,V,S>>
java.lang.Object
org.springframework.integration.dsl.IntegrationComponentSpec<S,H>
org.springframework.integration.dsl.MessageHandlerSpec<S,KafkaProducerMessageHandler<K,V>>
org.springframework.integration.kafka.dsl.KafkaProducerMessageHandlerSpec<K,V,S>
- Type Parameters:
K- the key type.V- the value type.S- theKafkaProducerMessageHandlerSpecextension type.
- All Implemented Interfaces:
DisposableBean,FactoryBean<KafkaProducerMessageHandler<K,,V>> InitializingBean,Lifecycle,Phased,SmartLifecycle
- Direct Known Subclasses:
KafkaOutboundGatewaySpec,KafkaProducerMessageHandlerSpec.KafkaProducerMessageHandlerTemplateSpec
public class KafkaProducerMessageHandlerSpec<K,V,S extends KafkaProducerMessageHandlerSpec<K,V,S>>
extends MessageHandlerSpec<S,KafkaProducerMessageHandler<K,V>>
A
MessageHandlerSpec implementation for the KafkaProducerMessageHandler.- Since:
- 5.4
- Author:
- Artem Bilan, Biju Kunjummen, Gary Russell
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classAKafkaTemplate-basedKafkaProducerMessageHandlerSpecextension. -
Field Summary
Fields inherited from class org.springframework.integration.dsl.IntegrationComponentSpec
logger, PARSER, targetFields inherited from interface org.springframework.beans.factory.FactoryBean
OBJECT_TYPE_ATTRIBUTEFields inherited from interface org.springframework.context.SmartLifecycle
DEFAULT_PHASE -
Method Summary
Modifier and TypeMethodDescription<P> SConfigure aFunctionthat will be invoked at runtime to determine whether to flush the producer after send.flushExpression(String flushExpression) Configure a SpEL expression to determine whether to flush the producer after a send.flushExpression(Expression flushExpression) Configure anExpressionto determine whether to flush the producer after a send.futuresChannel(String futuresChannel) Set the channel to which send futures are sent.futuresChannel(MessageChannel futuresChannel) Set the channel to which send futures are sent.headerMapper(org.springframework.kafka.support.KafkaHeaderMapper mapper) Specify a header mapper to map spring messaging headers to Kafka headers.messageKey(String messageKey) Configure the message key to store message in Kafka topic.<P> SmessageKey(Function<Message<P>, ?> messageKeyFunction) Configure aFunctionthat will be invoked at runtime to determine the message key under which a message will be stored in the topic.messageKeyExpression(String messageKeyExpression) Configure a SpEL expression to determine the Kafka message key to store at runtime against request Message as a root object of evaluation context.messageKeyExpression(Expression messageKeyExpression) Configure anExpressionto determine the Kafka message key to store at runtime against request Message as a root object of evaluation context.partitionId(Integer partitionId) Configure a partitionId of Kafka topic.<P> SpartitionId(Function<Message<P>, Integer> partitionIdFunction) Configure aFunctionthat will be invoked at runtime to determine the partition id under which a message will be stored in the topic.partitionIdExpression(String partitionIdExpression) Configure a SpEL expression to determine the topic partitionId at runtime against request Message as a root object of evaluation context.partitionIdExpression(Expression partitionIdExpression) Configure anExpressionto determine the topic partitionId at runtime against request Message as a root object of evaluation context.Set aKafkaProducerMessageHandler.ProducerRecordCreatorto create theProducerRecord.sendFailureChannel(String sendFailureChannel) Set the channel to which failed send results are sent.sendFailureChannel(MessageChannel sendFailureChannel) Set the channel to which failed send results are sent.sendSuccessChannel(String sendSuccessChannel) Set the channel to which successful send results are sent.sendSuccessChannel(MessageChannel sendSuccessChannel) Set the channel to which successful send results are sent.sendTimeout(long sendTimeout) Specify a timeout in milliseconds how longKafkaProducerMessageHandlershould wait for send operation results.sync(boolean sync) Abooleanindicating if theKafkaProducerMessageHandlershould wait for the send operation results or not.<P> SConfigure aFunctionthat will be invoked at runtime to determine the Kafka record timestamp will be stored in the topic.timestampExpression(String timestampExpression) Configure a SpEL expression to determine the timestamp at runtime against a request Message as a root object of evaluation context.timestampExpression(Expression timestampExpression) Configure anExpressionto determine the timestamp at runtime against a request Message as a root object of evaluation context.Configure the Kafka topic to send messages.<P> SConfigure aFunctionthat will be invoked at runtime to determine the topic to which a message will be sent.topicExpression(String topicExpression) Configure a SpEL expression to determine the Kafka topic at runtime against request Message as a root object of evaluation context.topicExpression(Expression topicExpression) Configure anExpressionto determine the Kafka topic at runtime against request Message as a root object of evaluation context.useTemplateConverter(boolean use) Set to true to use the template's message converter to create theProducerRecordinstead of theproducerRecordCreator.Methods inherited from class org.springframework.integration.dsl.IntegrationComponentSpec
_this, afterPropertiesSet, destroy, doGet, getId, getObject, getObjectType, getPhase, id, isAutoStartup, isRunning, start, stop, stopMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.springframework.beans.factory.FactoryBean
isSingleton
-
Method Details
-
topic
Configure the Kafka topic to send messages.- Parameters:
topic- the Kafka topic name.- Returns:
- the spec.
-
topicExpression
Configure a SpEL expression to determine the Kafka topic at runtime against request Message as a root object of evaluation context.- Parameters:
topicExpression- the topic SpEL expression.- Returns:
- the spec.
-
topicExpression
Configure anExpressionto determine the Kafka topic at runtime against request Message as a root object of evaluation context.- Parameters:
topicExpression- the topic expression.- Returns:
- the spec.
-
topic
Configure aFunctionthat will be invoked at runtime to determine the topic to which a message will be sent. Typically used with a Java 8 Lambda expression:.<Foo>topic(m -> m.getPayload().getTopic())- Type Parameters:
P- the expected payload type.- Parameters:
topicFunction- the topic function.- Returns:
- the current
KafkaProducerMessageHandlerSpec. - See Also:
-
messageKeyExpression
Configure a SpEL expression to determine the Kafka message key to store at runtime against request Message as a root object of evaluation context.- Parameters:
messageKeyExpression- the message key SpEL expression.- Returns:
- the spec.
-
messageKey
Configure the message key to store message in Kafka topic.- Parameters:
messageKey- the message key to use.- Returns:
- the spec.
-
messageKeyExpression
Configure anExpressionto determine the Kafka message key to store at runtime against request Message as a root object of evaluation context.- Parameters:
messageKeyExpression- the message key expression.- Returns:
- the spec.
-
messageKey
Configure aFunctionthat will be invoked at runtime to determine the message key under which a message will be stored in the topic. Typically used with a Java 8 Lambda expression:.<Foo>messageKey(m -> m.getPayload().getKey())- Type Parameters:
P- the expected payload type.- Parameters:
messageKeyFunction- the message key function.- Returns:
- the current
KafkaProducerMessageHandlerSpec. - See Also:
-
partitionId
Configure a partitionId of Kafka topic.- Parameters:
partitionId- the partitionId to use.- Returns:
- the spec.
-
partitionIdExpression
Configure a SpEL expression to determine the topic partitionId at runtime against request Message as a root object of evaluation context.- Parameters:
partitionIdExpression- the partitionId expression to use.- Returns:
- the spec.
-
partitionId
Configure aFunctionthat will be invoked at runtime to determine the partition id under which a message will be stored in the topic. Typically used with a Java 8 Lambda expression:.partitionId(m -> m.getHeaders().get("partitionId", Integer.class))- Type Parameters:
P- the expected payload type.- Parameters:
partitionIdFunction- the partitionId function.- Returns:
- the spec.
-
partitionIdExpression
Configure anExpressionto determine the topic partitionId at runtime against request Message as a root object of evaluation context.- Parameters:
partitionIdExpression- the partitionId expression to use.- Returns:
- the spec.
-
timestampExpression
Configure a SpEL expression to determine the timestamp at runtime against a request Message as a root object of evaluation context.- Parameters:
timestampExpression- the timestamp expression to use.- Returns:
- the spec.
-
timestamp
Configure aFunctionthat will be invoked at runtime to determine the Kafka record timestamp will be stored in the topic. Typically used with a Java 8 Lambda expression:.timestamp(m -> m.getHeaders().get("mytimestamp_header", Long.class))- Type Parameters:
P- the expected payload type.- Parameters:
timestampFunction- the timestamp function.- Returns:
- the spec.
-
timestampExpression
Configure anExpressionto determine the timestamp at runtime against a request Message as a root object of evaluation context.- Parameters:
timestampExpression- the timestamp expression to use.- Returns:
- the spec.
-
flushExpression
Configure a SpEL expression to determine whether to flush the producer after a send. By default, the producer is flushed if a headerkafka_flushhas a valueBoolean.TRUE.- Parameters:
flushExpression- the timestamp expression to use.- Returns:
- the spec.
-
flush
Configure aFunctionthat will be invoked at runtime to determine whether to flush the producer after send. By default, the producer is flushed if a headerkafka_flushhas a valueBoolean.TRUE. Typically, used with a Java Lambda expression:.flush(m -> m.getPayload().shouldFlush())- Type Parameters:
P- the expected payload type.- Parameters:
flushFunction- the flush function.- Returns:
- the spec.
-
flushExpression
Configure anExpressionto determine whether to flush the producer after a send. By default, the producer is flushed if a headerkafka_flushhas a valueBoolean.TRUE.- Parameters:
flushExpression- the timestamp expression to use.- Returns:
- the spec.
-
sync
Abooleanindicating if theKafkaProducerMessageHandlershould wait for the send operation results or not. Defaults tofalse. Insyncmode a downstream send operation exception will be re-thrown.- Parameters:
sync- the send mode; async by default.- Returns:
- the spec.
-
sendTimeout
Specify a timeout in milliseconds how longKafkaProducerMessageHandlershould wait for send operation results. Defaults to 10 seconds.- Parameters:
sendTimeout- the timeout to wait for result fo send operation.- Returns:
- the spec.
-
headerMapper
Specify a header mapper to map spring messaging headers to Kafka headers.- Parameters:
mapper- the mapper.- Returns:
- the spec.
-
sendSuccessChannel
Set the channel to which successful send results are sent.- Parameters:
sendSuccessChannel- the channel.- Returns:
- the spec.
-
sendSuccessChannel
Set the channel to which successful send results are sent.- Parameters:
sendSuccessChannel- the channel name.- Returns:
- the spec.
-
sendFailureChannel
Set the channel to which failed send results are sent.- Parameters:
sendFailureChannel- the channel.- Returns:
- the spec.
-
sendFailureChannel
Set the channel to which failed send results are sent.- Parameters:
sendFailureChannel- the channel name.- Returns:
- the spec.
-
futuresChannel
Set the channel to which send futures are sent.- Parameters:
futuresChannel- the channel.- Returns:
- the spec.
-
futuresChannel
Set the channel to which send futures are sent.- Parameters:
futuresChannel- the channel name.- Returns:
- the spec.
-
producerRecordCreator
Set aKafkaProducerMessageHandler.ProducerRecordCreatorto create theProducerRecord. Ignored ifuseTemplateConverteris true.- Parameters:
creator- the creator.- Returns:
- the spec.
- Since:
- 5.5.5
-
useTemplateConverter
Set to true to use the template's message converter to create theProducerRecordinstead of theproducerRecordCreator.- Parameters:
use- true to use the converter.- Returns:
- the spec.
- Since:
- 5.5.5
-