public class KafkaMessageChannelBinder extends AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner> implements ExtendedPropertiesBinder<org.springframework.messaging.MessageChannel,org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties,org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>
Binder
that uses Kafka as the underlying middleware.AbstractMessageChannelBinder.ErrorInfrastructure, AbstractMessageChannelBinder.PolledConsumerResources
Modifier and Type | Field and Description |
---|---|
static String |
X_EXCEPTION_MESSAGE |
static String |
X_EXCEPTION_STACKTRACE |
static String |
X_ORIGINAL_TOPIC |
provisioningProvider
logger
Constructor and Description |
---|
KafkaMessageChannelBinder(org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties configurationProperties,
org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner provisioningProvider) |
Modifier and Type | Method and Description |
---|---|
protected org.springframework.integration.core.MessageProducer |
createConsumerEndpoint(ConsumerDestination destination,
String group,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> extendedConsumerProperties)
Creates
MessageProducer that receives data from the consumer destination. |
protected org.springframework.kafka.core.ConsumerFactory<?,?> |
createKafkaConsumerFactory(boolean anonymous,
String consumerGroup,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> consumerProperties) |
protected AbstractMessageChannelBinder.PolledConsumerResources |
createPolledConsumerResources(String name,
String group,
ConsumerDestination destination,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> consumerProperties) |
protected org.springframework.messaging.MessageHandler |
createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties,
org.springframework.messaging.MessageChannel errorChannel)
Creates a
MessageHandler with the ability to send data to the target
middleware. |
protected org.springframework.messaging.MessageHandler |
getErrorMessageHandler(ConsumerDestination destination,
String group,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> properties)
Binders can return a message handler to be subscribed to the error channel.
|
protected org.springframework.integration.support.ErrorMessageStrategy |
getErrorMessageStrategy()
Binders can return an
ErrorMessageStrategy for building error messages; binder
implementations typically might add extra headers to the error message. |
org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties |
getExtendedConsumerProperties(String channelName) |
org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties |
getExtendedProducerProperties(String channelName) |
protected org.springframework.messaging.MessageHandler |
getPolledConsumerErrorMessageHandler(ConsumerDestination destination,
String group,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> properties)
Binders can return a message handler to be subscribed to the error channel.
|
protected org.springframework.kafka.core.DefaultKafkaProducerFactory<byte[],byte[]> |
getProducerFactory(String transactionIdPrefix,
ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties) |
protected void |
postProcessPollableSource(DefaultPollableMessageSource bindingTarget) |
void |
setExtendedBindingProperties(org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties extendedBindingProperties) |
void |
setProducerListener(org.springframework.kafka.support.ProducerListener<byte[],byte[]> producerListener) |
afterUnbindConsumer, afterUnbindProducer, bindPollableConsumer, doBindConsumer, doBindProducer, errorsBaseName, errorsBaseName, getApplicationEventPublisher, getDefaultErrorMessageHandler, getErrorBridgeName, getErrorBridgeName, getErrorMessageHandlerName, getErrorRecovererName, getPolledConsumerRecoveryCallback, postProcessOutputChannel, registerErrorInfrastructure, registerErrorInfrastructure, setApplicationEventPublisher
afterPropertiesSet, applyPrefix, bindConsumer, bindProducer, buildPartitionRoutingExpression, buildRetryTemplate, constructDLQName, getApplicationContext, getBeanFactory, getEvaluationContext, groupedName, onInit, serializePayloadIfNecessary, setApplicationContext
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
bindConsumer, bindProducer
public static final String X_EXCEPTION_STACKTRACE
public static final String X_EXCEPTION_MESSAGE
public static final String X_ORIGINAL_TOPIC
public KafkaMessageChannelBinder(org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties configurationProperties, org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner provisioningProvider)
public void setExtendedBindingProperties(org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties extendedBindingProperties)
public void setProducerListener(org.springframework.kafka.support.ProducerListener<byte[],byte[]> producerListener)
public org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties getExtendedConsumerProperties(String channelName)
getExtendedConsumerProperties
in interface ExtendedBindingProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties,org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>
public org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties getExtendedProducerProperties(String channelName)
getExtendedProducerProperties
in interface ExtendedBindingProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties,org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>
protected org.springframework.messaging.MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties, org.springframework.messaging.MessageChannel errorChannel) throws Exception
AbstractMessageChannelBinder
MessageHandler
with the ability to send data to the target
middleware. If the returned instance is also a Lifecycle
, it will be
stopped automatically by the binder.
In order to be fully compliant, the MessageHandler
of the binder must
observe the following headers:
BinderHeaders.PARTITION_HEADER
- indicates the target partition where
the message must be sentcreateProducerMessageHandler
in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>
destination
- the name of the target destinationproducerProperties
- the producer propertieserrorChannel
- the error channel (if enabled, otherwise null). If not null,
the binder must wire this channel into the producer endpoint so that errors
are forwarded to it.Exception
protected org.springframework.kafka.core.DefaultKafkaProducerFactory<byte[],byte[]> getProducerFactory(String transactionIdPrefix, ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties)
protected org.springframework.integration.core.MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> extendedConsumerProperties)
AbstractMessageChannelBinder
MessageProducer
that receives data from the consumer destination.
will be started and stopped by the binder.createConsumerEndpoint
in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>
destination
- reference to the consumer destinationgroup
- the consumer groupextendedConsumerProperties
- the consumer propertiesprotected AbstractMessageChannelBinder.PolledConsumerResources createPolledConsumerResources(String name, String group, ConsumerDestination destination, ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> consumerProperties)
createPolledConsumerResources
in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>
protected void postProcessPollableSource(DefaultPollableMessageSource bindingTarget)
postProcessPollableSource
in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>
protected org.springframework.integration.support.ErrorMessageStrategy getErrorMessageStrategy()
AbstractMessageChannelBinder
ErrorMessageStrategy
for building error messages; binder
implementations typically might add extra headers to the error message.getErrorMessageStrategy
in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>
protected org.springframework.messaging.MessageHandler getErrorMessageHandler(ConsumerDestination destination, String group, ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> properties)
AbstractMessageChannelBinder
getErrorMessageHandler
in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>
destination
- the destination.group
- the group.properties
- the properties.protected org.springframework.messaging.MessageHandler getPolledConsumerErrorMessageHandler(ConsumerDestination destination, String group, ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> properties)
AbstractMessageChannelBinder
getPolledConsumerErrorMessageHandler
in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>
destination
- the destination.group
- the group.properties
- the properties.protected org.springframework.kafka.core.ConsumerFactory<?,?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup, ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> consumerProperties)
Copyright © 2019 Pivotal Software, Inc.. All rights reserved.