public class KafkaMessageChannelBinder extends AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner> implements ExtendedPropertiesBinder<org.springframework.messaging.MessageChannel,org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties,org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>
Binder that uses Kafka as the underlying middleware.AbstractMessageChannelBinder.ErrorInfrastructure, AbstractMessageChannelBinder.PolledConsumerResources| Modifier and Type | Field and Description |
|---|---|
static String |
X_EXCEPTION_MESSAGE |
static String |
X_EXCEPTION_STACKTRACE |
static String |
X_ORIGINAL_TOPIC |
provisioningProviderlogger| Constructor and Description |
|---|
KafkaMessageChannelBinder(org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties configurationProperties,
org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner provisioningProvider) |
KafkaMessageChannelBinder(org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties configurationProperties,
org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner provisioningProvider,
ListenerContainerCustomizer<org.springframework.kafka.listener.AbstractMessageListenerContainer<?,?>> containerCustomizer) |
| Modifier and Type | Method and Description |
|---|---|
protected org.springframework.integration.core.MessageProducer |
createConsumerEndpoint(ConsumerDestination destination,
String group,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> extendedConsumerProperties)
Creates
MessageProducer that receives data from the consumer destination. |
protected org.springframework.kafka.core.ConsumerFactory<?,?> |
createKafkaConsumerFactory(boolean anonymous,
String consumerGroup,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> consumerProperties) |
protected AbstractMessageChannelBinder.PolledConsumerResources |
createPolledConsumerResources(String name,
String group,
ConsumerDestination destination,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> consumerProperties) |
protected org.springframework.messaging.MessageHandler |
createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties,
org.springframework.messaging.MessageChannel errorChannel)
Creates a
MessageHandler with the ability to send data to the target
middleware. |
protected org.springframework.messaging.MessageHandler |
getErrorMessageHandler(ConsumerDestination destination,
String group,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> properties)
Binders can return a message handler to be subscribed to the error channel.
|
protected org.springframework.integration.support.ErrorMessageStrategy |
getErrorMessageStrategy()
Binders can return an
ErrorMessageStrategy for building error messages; binder
implementations typically might add extra headers to the error message. |
org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties |
getExtendedConsumerProperties(String channelName) |
org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties |
getExtendedProducerProperties(String channelName) |
protected org.springframework.messaging.MessageHandler |
getPolledConsumerErrorMessageHandler(ConsumerDestination destination,
String group,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> properties)
Binders can return a message handler to be subscribed to the error channel.
|
protected org.springframework.kafka.core.DefaultKafkaProducerFactory<byte[],byte[]> |
getProducerFactory(String transactionIdPrefix,
ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties) |
protected void |
postProcessPollableSource(DefaultPollableMessageSource bindingTarget) |
Collection<org.apache.kafka.common.PartitionInfo> |
processTopic(String group,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> extendedConsumerProperties,
org.springframework.kafka.core.ConsumerFactory<?,?> consumerFactory,
int partitionCount,
boolean usingPatterns,
boolean groupManagement,
String topic) |
void |
setExtendedBindingProperties(org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties extendedBindingProperties) |
void |
setProducerListener(org.springframework.kafka.support.ProducerListener<byte[],byte[]> producerListener) |
afterUnbindConsumer, afterUnbindProducer, bindPollableConsumer, doBindConsumer, doBindProducer, errorsBaseName, errorsBaseName, getApplicationEventPublisher, getContainerCustomizer, getDefaultErrorMessageHandler, getErrorBridgeName, getErrorBridgeName, getErrorMessageHandlerName, getErrorRecovererName, getPolledConsumerRecoveryCallback, postProcessOutputChannel, registerErrorInfrastructure, registerErrorInfrastructure, setApplicationEventPublisherafterPropertiesSet, applyPrefix, bindConsumer, bindProducer, buildPartitionRoutingExpression, buildRetryTemplate, constructDLQName, getApplicationContext, getBeanFactory, getEvaluationContext, groupedName, onInit, serializePayloadIfNecessary, setApplicationContextclone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitbindConsumer, bindProducerpublic static final String X_EXCEPTION_STACKTRACE
public static final String X_EXCEPTION_MESSAGE
public static final String X_ORIGINAL_TOPIC
public KafkaMessageChannelBinder(org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties configurationProperties,
org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner provisioningProvider)
public KafkaMessageChannelBinder(org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties configurationProperties,
org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner provisioningProvider,
ListenerContainerCustomizer<org.springframework.kafka.listener.AbstractMessageListenerContainer<?,?>> containerCustomizer)
public void setExtendedBindingProperties(org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties extendedBindingProperties)
public void setProducerListener(org.springframework.kafka.support.ProducerListener<byte[],byte[]> producerListener)
public org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties getExtendedConsumerProperties(String channelName)
getExtendedConsumerProperties in interface ExtendedBindingProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties,org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>public org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties getExtendedProducerProperties(String channelName)
getExtendedProducerProperties in interface ExtendedBindingProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties,org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>protected org.springframework.messaging.MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties, org.springframework.messaging.MessageChannel errorChannel) throws Exception
AbstractMessageChannelBinderMessageHandler with the ability to send data to the target
middleware. If the returned instance is also a Lifecycle, it will be
stopped automatically by the binder.
In order to be fully compliant, the MessageHandler of the binder must
observe the following headers:
BinderHeaders.PARTITION_HEADER - indicates the target partition where
the message must be sentcreateProducerMessageHandler in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>destination - the name of the target destinationproducerProperties - the producer propertieserrorChannel - the error channel (if enabled, otherwise null). If not null,
the binder must wire this channel into the producer endpoint so that errors
are forwarded to it.Exceptionprotected org.springframework.kafka.core.DefaultKafkaProducerFactory<byte[],byte[]> getProducerFactory(String transactionIdPrefix, ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties)
protected org.springframework.integration.core.MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> extendedConsumerProperties)
AbstractMessageChannelBinderMessageProducer that receives data from the consumer destination.
will be started and stopped by the binder.createConsumerEndpoint in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>destination - reference to the consumer destinationgroup - the consumer groupextendedConsumerProperties - the consumer propertiespublic Collection<org.apache.kafka.common.PartitionInfo> processTopic(String group, ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> extendedConsumerProperties, org.springframework.kafka.core.ConsumerFactory<?,?> consumerFactory, int partitionCount, boolean usingPatterns, boolean groupManagement, String topic)
protected AbstractMessageChannelBinder.PolledConsumerResources createPolledConsumerResources(String name, String group, ConsumerDestination destination, ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> consumerProperties)
createPolledConsumerResources in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>protected void postProcessPollableSource(DefaultPollableMessageSource bindingTarget)
postProcessPollableSource in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>protected org.springframework.integration.support.ErrorMessageStrategy getErrorMessageStrategy()
AbstractMessageChannelBinderErrorMessageStrategy for building error messages; binder
implementations typically might add extra headers to the error message.getErrorMessageStrategy in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>protected org.springframework.messaging.MessageHandler getErrorMessageHandler(ConsumerDestination destination, String group, ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> properties)
AbstractMessageChannelBindergetErrorMessageHandler in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>destination - the destination.group - the group.properties - the properties.protected org.springframework.messaging.MessageHandler getPolledConsumerErrorMessageHandler(ConsumerDestination destination, String group, ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> properties)
AbstractMessageChannelBindergetPolledConsumerErrorMessageHandler in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>destination - the destination.group - the group.properties - the properties.protected org.springframework.kafka.core.ConsumerFactory<?,?> createKafkaConsumerFactory(boolean anonymous,
String consumerGroup,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> consumerProperties)
Copyright © 2018 Pivotal Software, Inc.. All rights reserved.