public class KafkaMessageChannelBinder extends AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner> implements ExtendedPropertiesBinder<org.springframework.messaging.MessageChannel,org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties,org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>
Binder
that uses Kafka as the underlying middleware.Modifier and Type | Class and Description |
---|---|
static class |
KafkaMessageChannelBinder.TopicInformation |
AbstractMessageChannelBinder.ErrorInfrastructure
Modifier and Type | Field and Description |
---|---|
static String |
X_EXCEPTION_MESSAGE |
static String |
X_EXCEPTION_STACKTRACE |
static String |
X_ORIGINAL_TOPIC |
EXPRESSION_PARSER, provisioningProvider
contentTypeResolver, logger
Constructor and Description |
---|
KafkaMessageChannelBinder(org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties configurationProperties,
org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner provisioningProvider) |
Modifier and Type | Method and Description |
---|---|
protected org.springframework.integration.core.MessageProducer |
createConsumerEndpoint(ConsumerDestination destination,
String group,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> extendedConsumerProperties)
Creates
MessageProducer that receives data from the consumer destination. |
protected org.springframework.messaging.MessageHandler |
createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties,
org.springframework.messaging.MessageChannel errorChannel)
Creates a
MessageHandler with the ability to send data to the target
middleware. |
protected org.springframework.messaging.MessageHandler |
getErrorMessageHandler(ConsumerDestination destination,
String group,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> extendedConsumerProperties)
Binders can return a message handler to be subscribed to the error channel.
|
protected org.springframework.integration.support.ErrorMessageStrategy |
getErrorMessageStrategy()
Binders can return an
ErrorMessageStrategy for building error messages; binder
implementations typically might add extra headers to the error message. |
org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties |
getExtendedConsumerProperties(String channelName) |
org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties |
getExtendedProducerProperties(String channelName) |
void |
setExtendedBindingProperties(org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties extendedBindingProperties) |
void |
setProducerListener(org.springframework.kafka.support.ProducerListener<byte[],byte[]> producerListener) |
afterUnbindConsumer, afterUnbindProducer, doBindConsumer, doBindProducer, errorsBaseName, errorsBaseName, getDefaultErrorMessageHandler, getErrorBridgeName, getErrorBridgeName, getErrorMessageHandlerName, getErrorRecovererName, registerErrorInfrastructure
afterPropertiesSet, applyPrefix, bindConsumer, bindProducer, buildPartitionRoutingExpression, buildRetryTemplate, constructDLQName, deserializePayloadIfNecessary, deserializePayloadIfNecessary, getApplicationContext, getBeanFactory, groupedName, onInit, serializePayloadIfNecessary, serializePayloadIfNecessary, setApplicationContext, setCodec, setIntegrationEvaluationContext
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
bindConsumer, bindProducer
public static final String X_ORIGINAL_TOPIC
public static final String X_EXCEPTION_MESSAGE
public static final String X_EXCEPTION_STACKTRACE
public KafkaMessageChannelBinder(org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties configurationProperties, org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner provisioningProvider)
public void setExtendedBindingProperties(org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties extendedBindingProperties)
public void setProducerListener(org.springframework.kafka.support.ProducerListener<byte[],byte[]> producerListener)
public org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties getExtendedConsumerProperties(String channelName)
getExtendedConsumerProperties
in interface ExtendedBindingProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties,org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>
public org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties getExtendedProducerProperties(String channelName)
getExtendedProducerProperties
in interface ExtendedBindingProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties,org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>
protected org.springframework.messaging.MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties, org.springframework.messaging.MessageChannel errorChannel) throws Exception
AbstractMessageChannelBinder
MessageHandler
with the ability to send data to the target
middleware. If the returned instance is also a Lifecycle
, it will be
stopped automatically by the binder.
In order to be fully compliant, the MessageHandler
of the binder must
observe the following headers:
BinderHeaders.PARTITION_HEADER
- indicates the target partition where
the message must be sentcreateProducerMessageHandler
in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>
destination
- the name of the target destinationproducerProperties
- the producer propertieserrorChannel
- the error channel (if enabled, otherwise null). If not null,
the binder must wire this channel into the producer endpoint so that errors
are forwarded to it.Exception
protected org.springframework.integration.core.MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> extendedConsumerProperties)
AbstractMessageChannelBinder
MessageProducer
that receives data from the consumer destination.
will be started and stopped by the binder.createConsumerEndpoint
in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>
destination
- reference to the consumer destinationgroup
- the consumer groupextendedConsumerProperties
- the consumer propertiesprotected org.springframework.integration.support.ErrorMessageStrategy getErrorMessageStrategy()
AbstractMessageChannelBinder
ErrorMessageStrategy
for building error messages; binder
implementations typically might add extra headers to the error message.getErrorMessageStrategy
in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>
protected org.springframework.messaging.MessageHandler getErrorMessageHandler(ConsumerDestination destination, String group, ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> extendedConsumerProperties)
AbstractMessageChannelBinder
getErrorMessageHandler
in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>
destination
- the destination.group
- the group.extendedConsumerProperties
- the properties.Copyright © 2017 Pivotal Software, Inc.. All rights reserved.