Class KafkaProducerMessageHandler<K,V>
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.handler.MessageHandlerSupport
org.springframework.integration.handler.AbstractMessageHandler
org.springframework.integration.handler.AbstractMessageProducingHandler
org.springframework.integration.handler.AbstractReplyProducingMessageHandler
org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler<K,V>
- Type Parameters:
K
- the key type.V
- the value type.
- All Implemented Interfaces:
org.reactivestreams.Subscriber<Message<?>>
,Aware
,BeanClassLoaderAware
,BeanFactoryAware
,BeanNameAware
,DisposableBean
,InitializingBean
,ApplicationContextAware
,Lifecycle
,Ordered
,ExpressionCapable
,Orderable
,MessageProducer
,HeaderPropagationAware
,IntegrationPattern
,NamedComponent
,IntegrationManagement
,ManageableLifecycle
,TrackableComponent
,MessageHandler
,reactor.core.CoreSubscriber<Message<?>>
public class KafkaProducerMessageHandler<K,V> extends AbstractReplyProducingMessageHandler implements ManageableLifecycle
Kafka Message Handler; when supplied with a
ReplyingKafkaTemplate
it is used as
the handler in an outbound gateway. When supplied with a simple KafkaTemplate
it used as the handler in an outbound channel adapter.
Starting with version 3.2.1 the handler supports receiving a pre-built
ProducerRecord
payload. In that case, most configuration properties
(setTopicExpression(Expression)
etc.) are ignored. If the handler is used as
gateway, the ProducerRecord
will have its headers enhanced to add the
KafkaHeaders.REPLY_TOPIC
unless it already contains such a header. The handler
will not map any additional headers; providing such a payload assumes the headers have
already been mapped.
- Since:
- 5.4
- Author:
- Soby Chacko, Artem Bilan, Gary Russell, Marius Bogoevici, Biju Kunjummen, Tom van den Berge
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static interface
KafkaProducerMessageHandler.ProducerRecordCreator<K,V>
Creates aProducerRecord
from aMessage
and/or properties derived from configuration and/or the message.Nested classes/interfaces inherited from class org.springframework.integration.handler.AbstractReplyProducingMessageHandler
AbstractReplyProducingMessageHandler.RequestHandler
Nested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagement
IntegrationManagement.ManagementOverrides
-
Field Summary
Fields inherited from class org.springframework.integration.handler.AbstractMessageProducingHandler
messagingTemplate
Fields inherited from class org.springframework.integration.context.IntegrationObjectSupport
EXPRESSION_PARSER, logger
Fields inherited from interface org.springframework.integration.support.management.IntegrationManagement
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
Fields inherited from interface org.springframework.core.Ordered
HIGHEST_PRECEDENCE, LOWEST_PRECEDENCE
-
Constructor Summary
Constructors Constructor Description KafkaProducerMessageHandler(org.springframework.kafka.core.KafkaTemplate<K,V> kafkaTemplate)
-
Method Summary
Modifier and Type Method Description protected void
doInit()
String
getComponentType()
Subclasses may implement this method to provide component type information.protected MessageChannel
getFuturesChannel()
org.springframework.kafka.support.KafkaHeaderMapper
getHeaderMapper()
org.springframework.kafka.core.KafkaTemplate<?,?>
getKafkaTemplate()
protected MessageChannel
getSendFailureChannel()
protected MessageChannel
getSendSuccessChannel()
protected Object
handleRequestMessage(Message<?> message)
Subclasses must implement this method to handle the request Message.boolean
isRunning()
void
processSendResult(Message<?> message, org.apache.kafka.clients.producer.ProducerRecord<K,V> producerRecord, ListenableFuture<org.springframework.kafka.support.SendResult<K,V>> future, MessageChannel metadataChannel)
void
setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy)
Set the error message strategy implementation to use when sending error messages after send failures.void
setFlushExpression(Expression flushExpression)
Specify a SpEL expression that evaluates to aBoolean
to determine whether the producer should be flushed after the send.void
setFuturesChannel(MessageChannel futuresChannel)
Set the futures channel.void
setFuturesChannelName(String futuresChannelName)
Set the futures channel name.void
setHeaderMapper(org.springframework.kafka.support.KafkaHeaderMapper headerMapper)
Set the header mapper to use.void
setMessageKeyExpression(Expression messageKeyExpression)
void
setPartitionIdExpression(Expression partitionIdExpression)
void
setProducerRecordCreator(KafkaProducerMessageHandler.ProducerRecordCreator<K,V> producerRecordCreator)
Set aKafkaProducerMessageHandler.ProducerRecordCreator
to create theProducerRecord
.void
setReplyMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
Set a message converter for gateway replies.void
setReplyPayloadType(Type payloadType)
When using a type-aware message converter (such asStringJsonMessageConverter
, set the payload type the converter should create.void
setSendFailureChannel(MessageChannel sendFailureChannel)
Set the failure channel.void
setSendFailureChannelName(String sendFailureChannelName)
Set the failure channel name.void
setSendSuccessChannel(MessageChannel sendSuccessChannel)
Set the success channel.void
setSendSuccessChannelName(String sendSuccessChannelName)
Set the Success channel name.void
setSendTimeout(long sendTimeout)
Specify a timeout in milliseconds for how long thisKafkaProducerMessageHandler
should wait wait for send operation results.void
setSendTimeoutExpression(Expression sendTimeoutExpression)
Specify a SpEL expression to evaluate a timeout in milliseconds for how long thisKafkaProducerMessageHandler
should wait wait for send operation results.void
setSync(boolean sync)
Aboolean
indicating if theKafkaProducerMessageHandler
should wait for the send operation results or not.void
setTimeoutBuffer(int timeoutBuffer)
Set a buffer, in milliseconds, added to the configureddelivery.timeout.ms
to determine the minimum time to wait for the send future completion whensync
is true.void
setTimestampExpression(Expression timestampExpression)
Specify a SpEL expression to evaluate a timestamp that will be added in the Kafka record.void
setTopicExpression(Expression topicExpression)
void
start()
void
stop()
Methods inherited from class org.springframework.integration.handler.AbstractReplyProducingMessageHandler
doInvokeAdvisedRequestHandler, getBeanClassLoader, getIntegrationPatternType, getRequiresReply, handleMessageInternal, hasAdviceChain, onInit, setAdviceChain, setBeanClassLoader, setRequiresReply
Methods inherited from class org.springframework.integration.handler.AbstractMessageProducingHandler
addNotPropagatedHeaders, createOutputMessage, getNotPropagatedHeaders, getOutputChannel, isAsync, messageBuilderForReply, produceOutput, resolveErrorChannel, sendErrorMessage, sendOutput, sendOutputs, setAsync, setNotPropagatedHeaders, setOutputChannel, setOutputChannelName, shouldCopyRequestHeaders, shouldSplitOutput, updateNotPropagatedHeaders
Methods inherited from class org.springframework.integration.handler.AbstractMessageHandler
handleMessage, onComplete, onError, onNext, onSubscribe
Methods inherited from class org.springframework.integration.handler.MessageHandlerSupport
buildSendTimer, destroy, getManagedName, getManagedType, getMetricsCaptor, getOrder, getOverrides, isLoggingEnabled, registerMetricsCaptor, sendTimer, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, shouldTrack
Methods inherited from class org.springframework.integration.context.IntegrationObjectSupport
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
Methods inherited from interface org.springframework.integration.support.management.IntegrationManagement
getThisAs
Methods inherited from interface org.springframework.integration.support.context.NamedComponent
getBeanName, getComponentName
-
Constructor Details
-
Method Details
-
setTopicExpression
-
setMessageKeyExpression
-
setPartitionIdExpression
-
setTimestampExpression
Specify a SpEL expression to evaluate a timestamp that will be added in the Kafka record. The resulting value should be aLong
type representing epoch time in milliseconds.- Parameters:
timestampExpression
- theExpression
for timestamp to wait for result fo send operation.- Since:
- 2.3
-
setFlushExpression
Specify a SpEL expression that evaluates to aBoolean
to determine whether the producer should be flushed after the send. Defaults to looking for aBoolean
value in aKafkaIntegrationHeaders.FLUSH
header; false if absent.- Parameters:
flushExpression
- theExpression
.- Since:
- 3.3
-
setHeaderMapper
public void setHeaderMapper(org.springframework.kafka.support.KafkaHeaderMapper headerMapper)Set the header mapper to use.- Parameters:
headerMapper
- the mapper; can be null to disable header mapping.- Since:
- 2.3
-
getHeaderMapper
public org.springframework.kafka.support.KafkaHeaderMapper getHeaderMapper() -
getKafkaTemplate
public org.springframework.kafka.core.KafkaTemplate<?,?> getKafkaTemplate() -
setSync
public void setSync(boolean sync)Aboolean
indicating if theKafkaProducerMessageHandler
should wait for the send operation results or not. Defaults tofalse
. Insync
mode a downstream send operation exception will be re-thrown.- Parameters:
sync
- the send mode; async by default.- Since:
- 2.0.1
-
setSendTimeout
public final void setSendTimeout(long sendTimeout)Specify a timeout in milliseconds for how long thisKafkaProducerMessageHandler
should wait wait for send operation results. Defaults to the kafkadelivery.timeout.ms
property + 5 seconds. The timeout is applied Also applies when sending to the success or failure channels.- Overrides:
setSendTimeout
in classAbstractMessageProducingHandler
- Parameters:
sendTimeout
- the timeout to wait for result for a send operation.- Since:
- 2.0.1
-
setSendTimeoutExpression
Specify a SpEL expression to evaluate a timeout in milliseconds for how long thisKafkaProducerMessageHandler
should wait wait for send operation results. Defaults to the kafkadelivery.timeout.ms
property + 5 seconds. The timeout is applied only insync
mode. If this expression yields a result that is less than that value, the higher value is used.- Parameters:
sendTimeoutExpression
- theExpression
for timeout to wait for result for a send operation.- Since:
- 2.1.1
- See Also:
setTimeoutBuffer(int)
-
setSendFailureChannel
Set the failure channel. After a send failure, anErrorMessage
will be sent to this channel with a payload of aKafkaSendFailureException
with the failed message and cause.- Parameters:
sendFailureChannel
- the failure channel.- Since:
- 2.1.2
-
setSendFailureChannelName
Set the failure channel name. After a send failure, anErrorMessage
will be sent to this channel name with a payload of aKafkaSendFailureException
with the failed message and cause.- Parameters:
sendFailureChannelName
- the failure channel name.- Since:
- 2.1.2
-
setSendSuccessChannel
Set the success channel.- Parameters:
sendSuccessChannel
- the Success channel.- Since:
- 3.0.2
-
setSendSuccessChannelName
Set the Success channel name.- Parameters:
sendSuccessChannelName
- the Success channel name.- Since:
- 3.0.2
-
setFuturesChannel
Set the futures channel.- Parameters:
futuresChannel
- the futures channel.- Since:
- 5.4
-
setFuturesChannelName
Set the futures channel name.- Parameters:
futuresChannelName
- the futures channel name.- Since:
- 5.4
-
setErrorMessageStrategy
Set the error message strategy implementation to use when sending error messages after send failures. Cannot be null.- Parameters:
errorMessageStrategy
- the implementation.- Since:
- 2.1.2
-
setReplyMessageConverter
public void setReplyMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)Set a message converter for gateway replies.- Parameters:
messageConverter
- the converter.- Since:
- 3.0.2
- See Also:
setReplyPayloadType(Type)
-
setReplyPayloadType
When using a type-aware message converter (such asStringJsonMessageConverter
, set the payload type the converter should create. Defaults toObject
.- Parameters:
payloadType
- the type.- Since:
- 3.0.2
- See Also:
setReplyMessageConverter(RecordMessageConverter)
-
setProducerRecordCreator
public void setProducerRecordCreator(KafkaProducerMessageHandler.ProducerRecordCreator<K,V> producerRecordCreator)Set aKafkaProducerMessageHandler.ProducerRecordCreator
to create theProducerRecord
.- Parameters:
producerRecordCreator
- the creator.- Since:
- 3.2.1
-
setTimeoutBuffer
public void setTimeoutBuffer(int timeoutBuffer)Set a buffer, in milliseconds, added to the configureddelivery.timeout.ms
to determine the minimum time to wait for the send future completion whensync
is true.- Parameters:
timeoutBuffer
- the buffer.- Since:
- 5.4
- See Also:
setSendTimeoutExpression(Expression)
-
getComponentType
Description copied from class:IntegrationObjectSupport
Subclasses may implement this method to provide component type information.- Specified by:
getComponentType
in interfaceNamedComponent
- Overrides:
getComponentType
in classMessageHandlerSupport
-
getSendFailureChannel
-
getSendSuccessChannel
-
getFuturesChannel
-
doInit
protected void doInit()- Overrides:
doInit
in classAbstractReplyProducingMessageHandler
-
start
public void start()- Specified by:
start
in interfaceLifecycle
- Specified by:
start
in interfaceManageableLifecycle
-
stop
public void stop()- Specified by:
stop
in interfaceLifecycle
- Specified by:
stop
in interfaceManageableLifecycle
-
isRunning
public boolean isRunning()- Specified by:
isRunning
in interfaceLifecycle
- Specified by:
isRunning
in interfaceManageableLifecycle
-
handleRequestMessage
Description copied from class:AbstractReplyProducingMessageHandler
Subclasses must implement this method to handle the request Message. The return value may be a Message, a MessageBuilder, or any plain Object. The base class will handle the final creation of a reply Message from any of those starting points. If the return value is null, the Message flow will end here.- Specified by:
handleRequestMessage
in classAbstractReplyProducingMessageHandler
- Parameters:
message
- The request message.- Returns:
- The result of handling the message, or
null
.
-
processSendResult
public void processSendResult(Message<?> message, org.apache.kafka.clients.producer.ProducerRecord<K,V> producerRecord, ListenableFuture<org.springframework.kafka.support.SendResult<K,V>> future, MessageChannel metadataChannel) throws InterruptedException, ExecutionException
-