public class KafkaMessageChannelBinder extends AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>,ExtendedProducerProperties<KafkaProducerProperties>,Collection<org.apache.kafka.common.PartitionInfo>,String> implements ExtendedPropertiesBinder<org.springframework.messaging.MessageChannel,KafkaConsumerProperties,KafkaProducerProperties>, org.springframework.beans.factory.DisposableBean
Binder
that uses Kafka as the underlying middleware.AbstractBinder.JavaClassMimeTypeConversion
EXPRESSION_PARSER
logger
Constructor and Description |
---|
KafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties) |
Modifier and Type | Method and Description |
---|---|
protected Collection<org.apache.kafka.common.PartitionInfo> |
createConsumerDestinationIfNecessary(String name,
String group,
ExtendedConsumerProperties<KafkaConsumerProperties> properties)
Creates the middleware destination the consumer will start to consume data from.
|
protected org.springframework.integration.core.MessageProducer |
createConsumerEndpoint(String name,
String group,
Collection<org.apache.kafka.common.PartitionInfo> destination,
ExtendedConsumerProperties<KafkaConsumerProperties> properties)
Creates
MessageProducer that receives data from the consumer destination. |
protected String |
createProducerDestinationIfNecessary(String name,
ExtendedProducerProperties<KafkaProducerProperties> properties)
Creates target destinations for outbound channels.
|
protected org.springframework.messaging.MessageHandler |
createProducerMessageHandler(String destination,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties)
Creates a
MessageHandler with the ability to send data to the
target middleware. |
void |
destroy() |
KafkaConsumerProperties |
getExtendedConsumerProperties(String channelName) |
KafkaProducerProperties |
getExtendedProducerProperties(String channelName) |
void |
onInit()
Subclasses may implement this method to perform any necessary initialization.
|
void |
setAdminUtilsOperation(AdminUtilsOperation adminUtilsOperation) |
void |
setExtendedBindingProperties(KafkaExtendedBindingProperties extendedBindingProperties) |
void |
setMetadataRetryOperations(org.springframework.retry.RetryOperations metadataRetryOperations)
Retry configuration for operations such as validating topic creation
|
void |
setProducerListener(org.springframework.kafka.support.ProducerListener<byte[],byte[]> producerListener) |
afterUnbindConsumer, afterUnbindProducer, doBindConsumer, doBindProducer
afterPropertiesSet, applyPrefix, bindConsumer, bindProducer, buildPartitionRoutingExpression, buildRetryTemplate, constructDLQName, getApplicationContext, getBeanFactory, groupedName, setApplicationContext, setCodec, setIntegrationEvaluationContext
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
bindConsumer, bindProducer
public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties)
public void setAdminUtilsOperation(AdminUtilsOperation adminUtilsOperation)
public void setMetadataRetryOperations(org.springframework.retry.RetryOperations metadataRetryOperations)
metadataRetryOperations
- the retry configurationpublic void setExtendedBindingProperties(KafkaExtendedBindingProperties extendedBindingProperties)
public void onInit() throws Exception
AbstractBinder
AbstractBinder.afterPropertiesSet()
which is itself final
.onInit
in class AbstractBinder<org.springframework.messaging.MessageChannel,ExtendedConsumerProperties<KafkaConsumerProperties>,ExtendedProducerProperties<KafkaProducerProperties>>
Exception
public void destroy() throws Exception
destroy
in interface org.springframework.beans.factory.DisposableBean
Exception
public void setProducerListener(org.springframework.kafka.support.ProducerListener<byte[],byte[]> producerListener)
public KafkaConsumerProperties getExtendedConsumerProperties(String channelName)
getExtendedConsumerProperties
in interface ExtendedBindingProperties<KafkaConsumerProperties,KafkaProducerProperties>
public KafkaProducerProperties getExtendedProducerProperties(String channelName)
getExtendedProducerProperties
in interface ExtendedBindingProperties<KafkaConsumerProperties,KafkaProducerProperties>
protected org.springframework.messaging.MessageHandler createProducerMessageHandler(String destination, ExtendedProducerProperties<KafkaProducerProperties> producerProperties) throws Exception
AbstractMessageChannelBinder
MessageHandler
with the ability to send data to the
target middleware. If the returned instance is also a Lifecycle
,
it will be stopped automatically by the binder.
In order to be fully compliant, the MessageHandler
of the binder
must observe the following headers:
BinderHeaders.PARTITION_HEADER
- indicates the target
partition where the message must be sentcreateProducerMessageHandler
in class AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>,ExtendedProducerProperties<KafkaProducerProperties>,Collection<org.apache.kafka.common.PartitionInfo>,String>
destination
- the name of the target destinationproducerProperties
- the producer propertiesException
protected String createProducerDestinationIfNecessary(String name, ExtendedProducerProperties<KafkaProducerProperties> properties)
AbstractMessageChannelBinder
createProducerDestinationIfNecessary
in class AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>,ExtendedProducerProperties<KafkaProducerProperties>,Collection<org.apache.kafka.common.PartitionInfo>,String>
name
- the name of the producer destinationproperties
- producer propertiesprotected Collection<org.apache.kafka.common.PartitionInfo> createConsumerDestinationIfNecessary(String name, String group, ExtendedConsumerProperties<KafkaConsumerProperties> properties)
AbstractMessageChannelBinder
createConsumerDestinationIfNecessary
in class AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>,ExtendedProducerProperties<KafkaProducerProperties>,Collection<org.apache.kafka.common.PartitionInfo>,String>
name
- the name of the destinationgroup
- the consumer groupproperties
- consumer propertiesprotected org.springframework.integration.core.MessageProducer createConsumerEndpoint(String name, String group, Collection<org.apache.kafka.common.PartitionInfo> destination, ExtendedConsumerProperties<KafkaConsumerProperties> properties)
AbstractMessageChannelBinder
MessageProducer
that receives data from the consumer destination.
will be started and stopped by the binder.createConsumerEndpoint
in class AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>,ExtendedProducerProperties<KafkaProducerProperties>,Collection<org.apache.kafka.common.PartitionInfo>,String>
name
- the name of the target destinationgroup
- the consumer groupdestination
- reference to the consumer destinationproperties
- the consumer propertiesCopyright © 2017 Pivotal Software, Inc.. All rights reserved.