K
- the key type.V
- the value type.public class KafkaProducerMessageHandler<K,V> extends AbstractReplyProducingMessageHandler implements ManageableLifecycle
ReplyingKafkaTemplate
it is used as
the handler in an outbound gateway. When supplied with a simple KafkaTemplate
it used as the handler in an outbound channel adapter.
Starting with version 3.2.1 the handler supports receiving a pre-built
ProducerRecord
payload. In that case, most configuration properties
(setTopicExpression(Expression)
etc.) are ignored. If the handler is used as
gateway, the ProducerRecord
will have its headers enhanced to add the
KafkaHeaders.REPLY_TOPIC
unless it already contains such a header. The handler
will not map any additional headers; providing such a payload assumes the headers have
already been mapped.
Modifier and Type | Class and Description |
---|---|
static interface |
KafkaProducerMessageHandler.ProducerRecordCreator<K,V>
Creates a
ProducerRecord from a Message and/or properties
derived from configuration and/or the message. |
AbstractReplyProducingMessageHandler.RequestHandler
IntegrationManagement.ManagementOverrides
messagingTemplate
EXPRESSION_PARSER, logger
HIGHEST_PRECEDENCE, LOWEST_PRECEDENCE
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
Constructor and Description |
---|
KafkaProducerMessageHandler(org.springframework.kafka.core.KafkaTemplate<K,V> kafkaTemplate) |
Modifier and Type | Method and Description |
---|---|
protected void |
doInit() |
String |
getComponentType()
Subclasses may implement this method to provide component type information.
|
protected MessageChannel |
getFuturesChannel() |
org.springframework.kafka.support.KafkaHeaderMapper |
getHeaderMapper() |
org.springframework.kafka.core.KafkaTemplate<?,?> |
getKafkaTemplate() |
protected MessageChannel |
getSendFailureChannel() |
protected MessageChannel |
getSendSuccessChannel() |
protected Object |
handleRequestMessage(Message<?> message)
Subclasses must implement this method to handle the request Message.
|
boolean |
isRunning() |
void |
processSendResult(Message<?> message,
org.apache.kafka.clients.producer.ProducerRecord<K,V> producerRecord,
ListenableFuture<org.springframework.kafka.support.SendResult<K,V>> future,
MessageChannel metadataChannel) |
void |
setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy)
Set the error message strategy implementation to use when sending error messages after
send failures.
|
void |
setFlushExpression(Expression flushExpression)
Specify a SpEL expression that evaluates to a
Boolean to determine whether
the producer should be flushed after the send. |
void |
setFuturesChannel(MessageChannel futuresChannel)
Set the futures channel.
|
void |
setFuturesChannelName(String futuresChannelName)
Set the futures channel name.
|
void |
setHeaderMapper(org.springframework.kafka.support.KafkaHeaderMapper headerMapper)
Set the header mapper to use.
|
void |
setMessageKeyExpression(Expression messageKeyExpression) |
void |
setPartitionIdExpression(Expression partitionIdExpression) |
void |
setProducerRecordCreator(KafkaProducerMessageHandler.ProducerRecordCreator<K,V> producerRecordCreator)
Set a
KafkaProducerMessageHandler.ProducerRecordCreator to create the ProducerRecord . |
void |
setReplyMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
Set a message converter for gateway replies.
|
void |
setReplyPayloadType(Type payloadType)
When using a type-aware message converter (such as
StringJsonMessageConverter ,
set the payload type the converter should create. |
void |
setSendFailureChannel(MessageChannel sendFailureChannel)
Set the failure channel.
|
void |
setSendFailureChannelName(String sendFailureChannelName)
Set the failure channel name.
|
void |
setSendSuccessChannel(MessageChannel sendSuccessChannel)
Set the success channel.
|
void |
setSendSuccessChannelName(String sendSuccessChannelName)
Set the Success channel name.
|
void |
setSendTimeout(long sendTimeout)
Specify a timeout in milliseconds for how long this
KafkaProducerMessageHandler should wait wait for send operation results. |
void |
setSendTimeoutExpression(Expression sendTimeoutExpression)
Specify a SpEL expression to evaluate a timeout in milliseconds for how long this
KafkaProducerMessageHandler should wait wait for send operation results. |
void |
setSync(boolean sync)
A
boolean indicating if the KafkaProducerMessageHandler
should wait for the send operation results or not. |
void |
setTimeoutBuffer(int timeoutBuffer)
Set a buffer, in milliseconds, added to the configured
delivery.timeout.ms
to determine the minimum time to wait for the send future completion when
sync is true. |
void |
setTimestampExpression(Expression timestampExpression)
Specify a SpEL expression to evaluate a timestamp that will be added in the Kafka record.
|
void |
setTopicExpression(Expression topicExpression) |
void |
start() |
void |
stop() |
doInvokeAdvisedRequestHandler, getBeanClassLoader, getIntegrationPatternType, getRequiresReply, handleMessageInternal, hasAdviceChain, onInit, setAdviceChain, setBeanClassLoader, setRequiresReply
addNotPropagatedHeaders, createOutputMessage, getNotPropagatedHeaders, getOutputChannel, isAsync, messageBuilderForReply, produceOutput, resolveErrorChannel, sendErrorMessage, sendOutput, sendOutputs, setAsync, setNotPropagatedHeaders, setOutputChannel, setOutputChannelName, shouldCopyRequestHeaders, shouldSplitOutput, updateNotPropagatedHeaders
handleMessage, onComplete, onError, onNext, onSubscribe
buildSendTimer, destroy, getManagedName, getManagedType, getMetricsCaptor, getOrder, getOverrides, isLoggingEnabled, registerMetricsCaptor, sendTimer, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, shouldTrack
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
getThisAs
getBeanName, getComponentName
public void setTopicExpression(Expression topicExpression)
public void setMessageKeyExpression(Expression messageKeyExpression)
public void setPartitionIdExpression(Expression partitionIdExpression)
public void setTimestampExpression(Expression timestampExpression)
Long
type representing epoch time in milliseconds.timestampExpression
- the Expression
for timestamp to wait for result
fo send operation.public void setFlushExpression(Expression flushExpression)
Boolean
to determine whether
the producer should be flushed after the send. Defaults to looking for a
Boolean
value in a KafkaIntegrationHeaders.FLUSH
header; false if
absent.flushExpression
- the Expression
.public void setHeaderMapper(org.springframework.kafka.support.KafkaHeaderMapper headerMapper)
headerMapper
- the mapper; can be null to disable header mapping.public org.springframework.kafka.support.KafkaHeaderMapper getHeaderMapper()
public org.springframework.kafka.core.KafkaTemplate<?,?> getKafkaTemplate()
public void setSync(boolean sync)
boolean
indicating if the KafkaProducerMessageHandler
should wait for the send operation results or not. Defaults to false
.
In sync
mode a downstream send operation exception will be re-thrown.sync
- the send mode; async by default.public final void setSendTimeout(long sendTimeout)
KafkaProducerMessageHandler
should wait wait for send operation results.
Defaults to the kafka delivery.timeout.ms
property + 5 seconds. The timeout
is applied Also applies when sending to the success or failure channels.setSendTimeout
in class AbstractMessageProducingHandler
sendTimeout
- the timeout to wait for result for a send operation.public void setSendTimeoutExpression(Expression sendTimeoutExpression)
KafkaProducerMessageHandler
should wait wait for send operation results.
Defaults to the kafka delivery.timeout.ms
property + 5 seconds. The timeout
is applied only in sync
mode. If this expression yields a result that is
less than that value, the higher value is used.sendTimeoutExpression
- the Expression
for timeout to wait for result
for a send operation.setTimeoutBuffer(int)
public void setSendFailureChannel(MessageChannel sendFailureChannel)
ErrorMessage
will be sent
to this channel with a payload of a KafkaSendFailureException
with the
failed message and cause.sendFailureChannel
- the failure channel.public void setSendFailureChannelName(String sendFailureChannelName)
ErrorMessage
will be
sent to this channel name with a payload of a KafkaSendFailureException
with the failed message and cause.sendFailureChannelName
- the failure channel name.public void setSendSuccessChannel(MessageChannel sendSuccessChannel)
sendSuccessChannel
- the Success channel.public void setSendSuccessChannelName(String sendSuccessChannelName)
sendSuccessChannelName
- the Success channel name.public void setFuturesChannel(MessageChannel futuresChannel)
futuresChannel
- the futures channel.public void setFuturesChannelName(String futuresChannelName)
futuresChannelName
- the futures channel name.public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy)
errorMessageStrategy
- the implementation.public void setReplyMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
messageConverter
- the converter.setReplyPayloadType(Type)
public void setReplyPayloadType(Type payloadType)
StringJsonMessageConverter
,
set the payload type the converter should create. Defaults to Object
.payloadType
- the type.setReplyMessageConverter(RecordMessageConverter)
public void setProducerRecordCreator(KafkaProducerMessageHandler.ProducerRecordCreator<K,V> producerRecordCreator)
KafkaProducerMessageHandler.ProducerRecordCreator
to create the ProducerRecord
.producerRecordCreator
- the creator.public void setTimeoutBuffer(int timeoutBuffer)
delivery.timeout.ms
to determine the minimum time to wait for the send future completion when
sync
is true.timeoutBuffer
- the buffer.setSendTimeoutExpression(Expression)
public String getComponentType()
IntegrationObjectSupport
getComponentType
in interface NamedComponent
getComponentType
in class MessageHandlerSupport
protected MessageChannel getSendFailureChannel()
protected MessageChannel getSendSuccessChannel()
protected MessageChannel getFuturesChannel()
protected void doInit()
doInit
in class AbstractReplyProducingMessageHandler
public void start()
start
in interface Lifecycle
start
in interface ManageableLifecycle
public void stop()
stop
in interface Lifecycle
stop
in interface ManageableLifecycle
public boolean isRunning()
isRunning
in interface Lifecycle
isRunning
in interface ManageableLifecycle
protected Object handleRequestMessage(Message<?> message)
AbstractReplyProducingMessageHandler
handleRequestMessage
in class AbstractReplyProducingMessageHandler
message
- The request message.null
.public void processSendResult(Message<?> message, org.apache.kafka.clients.producer.ProducerRecord<K,V> producerRecord, ListenableFuture<org.springframework.kafka.support.SendResult<K,V>> future, MessageChannel metadataChannel) throws InterruptedException, ExecutionException