Class KafkaProducerMessageHandlerSpec<K,V,S extends KafkaProducerMessageHandlerSpec<K,V,S>>
java.lang.Object
org.springframework.integration.dsl.IntegrationComponentSpec<S,H>
org.springframework.integration.dsl.MessageHandlerSpec<S,KafkaProducerMessageHandler<K,V>>
org.springframework.integration.kafka.dsl.KafkaProducerMessageHandlerSpec<K,V,S>
- Type Parameters:
K
- the key type.V
- the value type.S
- theKafkaProducerMessageHandlerSpec
extension type.
- All Implemented Interfaces:
DisposableBean
,FactoryBean<KafkaProducerMessageHandler<K,
,V>> InitializingBean
,Lifecycle
,Phased
,SmartLifecycle
- Direct Known Subclasses:
KafkaOutboundGatewaySpec
,KafkaProducerMessageHandlerSpec.KafkaProducerMessageHandlerTemplateSpec
public class KafkaProducerMessageHandlerSpec<K,V,S extends KafkaProducerMessageHandlerSpec<K,V,S>>
extends MessageHandlerSpec<S,KafkaProducerMessageHandler<K,V>>
A
MessageHandlerSpec
implementation for the KafkaProducerMessageHandler
.- Since:
- 5.4
- Author:
- Artem Bilan, Biju Kunjummen, Gary Russell
-
Nested Class Summary
Modifier and TypeClassDescriptionstatic class
AKafkaTemplate
-basedKafkaProducerMessageHandlerSpec
extension. -
Field Summary
Fields inherited from class org.springframework.integration.dsl.IntegrationComponentSpec
logger, PARSER, target
Fields inherited from interface org.springframework.beans.factory.FactoryBean
OBJECT_TYPE_ATTRIBUTE
Fields inherited from interface org.springframework.context.SmartLifecycle
DEFAULT_PHASE
-
Method Summary
Modifier and TypeMethodDescription<P> S
Configure aFunction
that will be invoked at runtime to determine whether to flush the producer after send.flushExpression
(String flushExpression) Configure a SpEL expression to determine whether to flush the producer after a send.flushExpression
(Expression flushExpression) Configure anExpression
to determine whether to flush the producer after a send.futuresChannel
(String futuresChannel) Set the channel to which send futures are sent.futuresChannel
(MessageChannel futuresChannel) Set the channel to which send futures are sent.headerMapper
(org.springframework.kafka.support.KafkaHeaderMapper mapper) Specify a header mapper to map spring messaging headers to Kafka headers.messageKey
(String messageKey) Configure the message key to store message in Kafka topic.<P> S
messageKey
(Function<Message<P>, ?> messageKeyFunction) Configure aFunction
that will be invoked at runtime to determine the message key under which a message will be stored in the topic.messageKeyExpression
(String messageKeyExpression) Configure a SpEL expression to determine the Kafka message key to store at runtime against request Message as a root object of evaluation context.messageKeyExpression
(Expression messageKeyExpression) Configure anExpression
to determine the Kafka message key to store at runtime against request Message as a root object of evaluation context.partitionId
(Integer partitionId) Configure a partitionId of Kafka topic.<P> S
partitionId
(Function<Message<P>, Integer> partitionIdFunction) Configure aFunction
that will be invoked at runtime to determine the partition id under which a message will be stored in the topic.partitionIdExpression
(String partitionIdExpression) Configure a SpEL expression to determine the topic partitionId at runtime against request Message as a root object of evaluation context.partitionIdExpression
(Expression partitionIdExpression) Configure anExpression
to determine the topic partitionId at runtime against request Message as a root object of evaluation context.Set aKafkaProducerMessageHandler.ProducerRecordCreator
to create theProducerRecord
.sendFailureChannel
(String sendFailureChannel) Set the channel to which failed send results are sent.sendFailureChannel
(MessageChannel sendFailureChannel) Set the channel to which failed send results are sent.sendSuccessChannel
(String sendSuccessChannel) Set the channel to which successful send results are sent.sendSuccessChannel
(MessageChannel sendSuccessChannel) Set the channel to which successful send results are sent.sendTimeout
(long sendTimeout) Specify a timeout in milliseconds how longKafkaProducerMessageHandler
should wait for send operation results.sync
(boolean sync) Aboolean
indicating if theKafkaProducerMessageHandler
should wait for the send operation results or not.<P> S
Configure aFunction
that will be invoked at runtime to determine the Kafka record timestamp will be stored in the topic.timestampExpression
(String timestampExpression) Configure a SpEL expression to determine the timestamp at runtime against a request Message as a root object of evaluation context.timestampExpression
(Expression timestampExpression) Configure anExpression
to determine the timestamp at runtime against a request Message as a root object of evaluation context.Configure the Kafka topic to send messages.<P> S
Configure aFunction
that will be invoked at runtime to determine the topic to which a message will be sent.topicExpression
(String topicExpression) Configure a SpEL expression to determine the Kafka topic at runtime against request Message as a root object of evaluation context.topicExpression
(Expression topicExpression) Configure anExpression
to determine the Kafka topic at runtime against request Message as a root object of evaluation context.useTemplateConverter
(boolean use) Set to true to use the template's message converter to create theProducerRecord
instead of theproducerRecordCreator
.Methods inherited from class org.springframework.integration.dsl.IntegrationComponentSpec
_this, afterPropertiesSet, destroy, doGet, getId, getObject, getObjectType, getPhase, id, isAutoStartup, isRunning, start, stop, stop
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.springframework.beans.factory.FactoryBean
isSingleton
-
Method Details
-
topic
Configure the Kafka topic to send messages.- Parameters:
topic
- the Kafka topic name.- Returns:
- the spec.
-
topicExpression
Configure a SpEL expression to determine the Kafka topic at runtime against request Message as a root object of evaluation context.- Parameters:
topicExpression
- the topic SpEL expression.- Returns:
- the spec.
-
topicExpression
Configure anExpression
to determine the Kafka topic at runtime against request Message as a root object of evaluation context.- Parameters:
topicExpression
- the topic expression.- Returns:
- the spec.
-
topic
Configure aFunction
that will be invoked at runtime to determine the topic to which a message will be sent. Typically used with a Java 8 Lambda expression:.<Foo>topic(m -> m.getPayload().getTopic())
- Type Parameters:
P
- the expected payload type.- Parameters:
topicFunction
- the topic function.- Returns:
- the current
KafkaProducerMessageHandlerSpec
. - See Also:
-
messageKeyExpression
Configure a SpEL expression to determine the Kafka message key to store at runtime against request Message as a root object of evaluation context.- Parameters:
messageKeyExpression
- the message key SpEL expression.- Returns:
- the spec.
-
messageKey
Configure the message key to store message in Kafka topic.- Parameters:
messageKey
- the message key to use.- Returns:
- the spec.
-
messageKeyExpression
Configure anExpression
to determine the Kafka message key to store at runtime against request Message as a root object of evaluation context.- Parameters:
messageKeyExpression
- the message key expression.- Returns:
- the spec.
-
messageKey
Configure aFunction
that will be invoked at runtime to determine the message key under which a message will be stored in the topic. Typically used with a Java 8 Lambda expression:.<Foo>messageKey(m -> m.getPayload().getKey())
- Type Parameters:
P
- the expected payload type.- Parameters:
messageKeyFunction
- the message key function.- Returns:
- the current
KafkaProducerMessageHandlerSpec
. - See Also:
-
partitionId
Configure a partitionId of Kafka topic.- Parameters:
partitionId
- the partitionId to use.- Returns:
- the spec.
-
partitionIdExpression
Configure a SpEL expression to determine the topic partitionId at runtime against request Message as a root object of evaluation context.- Parameters:
partitionIdExpression
- the partitionId expression to use.- Returns:
- the spec.
-
partitionId
Configure aFunction
that will be invoked at runtime to determine the partition id under which a message will be stored in the topic. Typically used with a Java 8 Lambda expression:.partitionId(m -> m.getHeaders().get("partitionId", Integer.class))
- Type Parameters:
P
- the expected payload type.- Parameters:
partitionIdFunction
- the partitionId function.- Returns:
- the spec.
-
partitionIdExpression
Configure anExpression
to determine the topic partitionId at runtime against request Message as a root object of evaluation context.- Parameters:
partitionIdExpression
- the partitionId expression to use.- Returns:
- the spec.
-
timestampExpression
Configure a SpEL expression to determine the timestamp at runtime against a request Message as a root object of evaluation context.- Parameters:
timestampExpression
- the timestamp expression to use.- Returns:
- the spec.
-
timestamp
Configure aFunction
that will be invoked at runtime to determine the Kafka record timestamp will be stored in the topic. Typically used with a Java 8 Lambda expression:.timestamp(m -> m.getHeaders().get("mytimestamp_header", Long.class))
- Type Parameters:
P
- the expected payload type.- Parameters:
timestampFunction
- the timestamp function.- Returns:
- the spec.
-
timestampExpression
Configure anExpression
to determine the timestamp at runtime against a request Message as a root object of evaluation context.- Parameters:
timestampExpression
- the timestamp expression to use.- Returns:
- the spec.
-
flushExpression
Configure a SpEL expression to determine whether to flush the producer after a send. By default, the producer is flushed if a headerkafka_flush
has a valueBoolean.TRUE
.- Parameters:
flushExpression
- the timestamp expression to use.- Returns:
- the spec.
-
flush
Configure aFunction
that will be invoked at runtime to determine whether to flush the producer after send. By default, the producer is flushed if a headerkafka_flush
has a valueBoolean.TRUE
. Typically, used with a Java Lambda expression:.flush(m -> m.getPayload().shouldFlush())
- Type Parameters:
P
- the expected payload type.- Parameters:
flushFunction
- the flush function.- Returns:
- the spec.
-
flushExpression
Configure anExpression
to determine whether to flush the producer after a send. By default, the producer is flushed if a headerkafka_flush
has a valueBoolean.TRUE
.- Parameters:
flushExpression
- the timestamp expression to use.- Returns:
- the spec.
-
sync
Aboolean
indicating if theKafkaProducerMessageHandler
should wait for the send operation results or not. Defaults tofalse
. Insync
mode a downstream send operation exception will be re-thrown.- Parameters:
sync
- the send mode; async by default.- Returns:
- the spec.
-
sendTimeout
Specify a timeout in milliseconds how longKafkaProducerMessageHandler
should wait for send operation results. Defaults to 10 seconds.- Parameters:
sendTimeout
- the timeout to wait for result fo send operation.- Returns:
- the spec.
-
headerMapper
Specify a header mapper to map spring messaging headers to Kafka headers.- Parameters:
mapper
- the mapper.- Returns:
- the spec.
-
sendSuccessChannel
Set the channel to which successful send results are sent.- Parameters:
sendSuccessChannel
- the channel.- Returns:
- the spec.
-
sendSuccessChannel
Set the channel to which successful send results are sent.- Parameters:
sendSuccessChannel
- the channel name.- Returns:
- the spec.
-
sendFailureChannel
Set the channel to which failed send results are sent.- Parameters:
sendFailureChannel
- the channel.- Returns:
- the spec.
-
sendFailureChannel
Set the channel to which failed send results are sent.- Parameters:
sendFailureChannel
- the channel name.- Returns:
- the spec.
-
futuresChannel
Set the channel to which send futures are sent.- Parameters:
futuresChannel
- the channel.- Returns:
- the spec.
-
futuresChannel
Set the channel to which send futures are sent.- Parameters:
futuresChannel
- the channel name.- Returns:
- the spec.
-
producerRecordCreator
Set aKafkaProducerMessageHandler.ProducerRecordCreator
to create theProducerRecord
. Ignored ifuseTemplateConverter
is true.- Parameters:
creator
- the creator.- Returns:
- the spec.
- Since:
- 5.5.5
-
useTemplateConverter
Set to true to use the template's message converter to create theProducerRecord
instead of theproducerRecordCreator
.- Parameters:
use
- true to use the converter.- Returns:
- the spec.
- Since:
- 5.5.5
-