public class KafkaMessageChannelBinder extends AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner> implements ExtendedPropertiesBinder<org.springframework.messaging.MessageChannel,org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties,org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>
Binder that uses Kafka as the underlying middleware.AbstractMessageChannelBinder.ErrorInfrastructure, AbstractMessageChannelBinder.PolledConsumerResources| Modifier and Type | Field and Description |
|---|---|
static String |
X_EXCEPTION_FQCN |
static String |
X_EXCEPTION_MESSAGE |
static String |
X_EXCEPTION_STACKTRACE |
static String |
X_ORIGINAL_OFFSET |
static String |
X_ORIGINAL_PARTITION |
static String |
X_ORIGINAL_TIMESTAMP |
static String |
X_ORIGINAL_TIMESTAMP_TYPE |
static String |
X_ORIGINAL_TOPIC |
provisioningProviderlogger| Constructor and Description |
|---|
KafkaMessageChannelBinder(org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties configurationProperties,
org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner provisioningProvider) |
KafkaMessageChannelBinder(org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties configurationProperties,
org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner provisioningProvider,
ListenerContainerCustomizer<org.springframework.kafka.listener.AbstractMessageListenerContainer<?,?>> containerCustomizer) |
| Modifier and Type | Method and Description |
|---|---|
protected org.springframework.integration.core.MessageProducer |
createConsumerEndpoint(ConsumerDestination destination,
String group,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> extendedConsumerProperties)
Creates
MessageProducer that receives data from the consumer destination. |
protected org.springframework.kafka.core.ConsumerFactory<?,?> |
createKafkaConsumerFactory(boolean anonymous,
String consumerGroup,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> consumerProperties) |
protected AbstractMessageChannelBinder.PolledConsumerResources |
createPolledConsumerResources(String name,
String group,
ConsumerDestination destination,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> consumerProperties) |
protected org.springframework.messaging.MessageHandler |
createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties,
org.springframework.messaging.MessageChannel errorChannel)
Create a
MessageHandler with the ability to send data to the target
middleware. |
protected org.springframework.messaging.MessageHandler |
createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties,
org.springframework.messaging.MessageChannel channel,
org.springframework.messaging.MessageChannel errorChannel)
Create a
MessageHandler with the ability to send data to the target
middleware. |
String |
getDefaultsPrefix()
Extended binding properties can define a default prefix to place all the extended
common producer and consumer properties.
|
protected org.springframework.messaging.MessageHandler |
getErrorMessageHandler(ConsumerDestination destination,
String group,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> properties)
Binders can return a message handler to be subscribed to the error channel.
|
protected org.springframework.integration.support.ErrorMessageStrategy |
getErrorMessageStrategy()
Binders can return an
ErrorMessageStrategy for building error messages; binder
implementations typically might add extra headers to the error message. |
org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties |
getExtendedConsumerProperties(String channelName) |
org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties |
getExtendedProducerProperties(String channelName) |
Class<? extends BinderSpecificPropertiesProvider> |
getExtendedPropertiesEntryClass()
Extended properties class which should be a subclass of
BinderSpecificPropertiesProvider
against which default extended producer and consumer properties
are resolved. |
protected org.springframework.messaging.MessageHandler |
getPolledConsumerErrorMessageHandler(ConsumerDestination destination,
String group,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> properties)
Binders can return a message handler to be subscribed to the error channel.
|
protected org.springframework.kafka.core.DefaultKafkaProducerFactory<byte[],byte[]> |
getProducerFactory(String transactionIdPrefix,
ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties) |
protected void |
postProcessPollableSource(DefaultPollableMessageSource bindingTarget) |
Collection<org.apache.kafka.common.PartitionInfo> |
processTopic(String group,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> extendedConsumerProperties,
org.springframework.kafka.core.ConsumerFactory<?,?> consumerFactory,
int partitionCount,
boolean usingPatterns,
boolean groupManagement,
String topic) |
void |
setExtendedBindingProperties(org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties extendedBindingProperties) |
void |
setProducerListener(org.springframework.kafka.support.ProducerListener<byte[],byte[]> producerListener) |
protected boolean |
useNativeEncoding(ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties)
Whether the producer for the destination being created should be configured to use
native encoding which may, or may not, be determined from the properties.
|
afterUnbindConsumer, afterUnbindProducer, bindPollableConsumer, doBindConsumer, doBindProducer, errorsBaseName, errorsBaseName, getApplicationEventPublisher, getContainerCustomizer, getDefaultErrorMessageHandler, getErrorBridgeName, getErrorBridgeName, getErrorMessageHandlerName, getErrorRecovererName, getPolledConsumerRecoveryCallback, postProcessOutputChannel, registerErrorInfrastructure, registerErrorInfrastructure, setApplicationEventPublisherafterPropertiesSet, applyPrefix, bindConsumer, bindProducer, buildPartitionRoutingExpression, buildRetryTemplate, constructDLQName, getApplicationContext, getBeanFactory, getEvaluationContext, groupedName, onInit, serializePayloadIfNecessary, setApplicationContextclone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitbindConsumer, bindProducerpublic static final String X_EXCEPTION_FQCN
public static final String X_EXCEPTION_STACKTRACE
public static final String X_EXCEPTION_MESSAGE
public static final String X_ORIGINAL_TOPIC
public static final String X_ORIGINAL_PARTITION
public static final String X_ORIGINAL_OFFSET
public static final String X_ORIGINAL_TIMESTAMP
public static final String X_ORIGINAL_TIMESTAMP_TYPE
public KafkaMessageChannelBinder(org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties configurationProperties,
org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner provisioningProvider)
public KafkaMessageChannelBinder(org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties configurationProperties,
org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner provisioningProvider,
ListenerContainerCustomizer<org.springframework.kafka.listener.AbstractMessageListenerContainer<?,?>> containerCustomizer)
public void setExtendedBindingProperties(org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties extendedBindingProperties)
public void setProducerListener(org.springframework.kafka.support.ProducerListener<byte[],byte[]> producerListener)
public org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties getExtendedConsumerProperties(String channelName)
getExtendedConsumerProperties in interface ExtendedBindingProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties,org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>public org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties getExtendedProducerProperties(String channelName)
getExtendedProducerProperties in interface ExtendedBindingProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties,org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>public String getDefaultsPrefix()
ExtendedBindingPropertiesgetDefaultsPrefix in interface ExtendedBindingProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties,org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass()
ExtendedBindingPropertiesBinderSpecificPropertiesProvider
against which default extended producer and consumer properties
are resolved.getExtendedPropertiesEntryClass in interface ExtendedBindingProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties,org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>protected org.springframework.messaging.MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties, org.springframework.messaging.MessageChannel errorChannel) throws Exception
AbstractMessageChannelBinderMessageHandler with the ability to send data to the target
middleware. If the returned instance is also a Lifecycle, it will be
stopped automatically by the binder.
In order to be fully compliant, the MessageHandler of the binder must
observe the following headers:
BinderHeaders.PARTITION_HEADER - indicates the target partition where
the message must be sentcreateProducerMessageHandler in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>destination - the name of the target destinationproducerProperties - the producer propertieserrorChannel - the error channel (if enabled, otherwise null). If not null,
the binder must wire this channel into the producer endpoint so that errors
are forwarded to it.Exceptionprotected org.springframework.messaging.MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties, org.springframework.messaging.MessageChannel channel, org.springframework.messaging.MessageChannel errorChannel) throws Exception
AbstractMessageChannelBinderMessageHandler with the ability to send data to the target
middleware. If the returned instance is also a Lifecycle, it will be
stopped automatically by the binder.
In order to be fully compliant, the MessageHandler of the binder must
observe the following headers:
BinderHeaders.PARTITION_HEADER - indicates the target partition where
the message must be sentcreateProducerMessageHandler in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>destination - the name of the target destination.producerProperties - the producer properties.channel - the channel to bind.errorChannel - the error channel (if enabled, otherwise null). If not null,
the binder must wire this channel into the producer endpoint so that errors
are forwarded to it.Exceptionprotected org.springframework.kafka.core.DefaultKafkaProducerFactory<byte[],byte[]> getProducerFactory(String transactionIdPrefix, ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties)
protected boolean useNativeEncoding(ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties)
AbstractMessageChannelBinderP#isUseNativeEncoding().useNativeEncoding in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>producerProperties - the properties.protected org.springframework.integration.core.MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> extendedConsumerProperties)
AbstractMessageChannelBinderMessageProducer that receives data from the consumer destination.
will be started and stopped by the binder.createConsumerEndpoint in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>destination - reference to the consumer destinationgroup - the consumer groupextendedConsumerProperties - the consumer propertiespublic Collection<org.apache.kafka.common.PartitionInfo> processTopic(String group, ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> extendedConsumerProperties, org.springframework.kafka.core.ConsumerFactory<?,?> consumerFactory, int partitionCount, boolean usingPatterns, boolean groupManagement, String topic)
protected AbstractMessageChannelBinder.PolledConsumerResources createPolledConsumerResources(String name, String group, ConsumerDestination destination, ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> consumerProperties)
createPolledConsumerResources in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>protected void postProcessPollableSource(DefaultPollableMessageSource bindingTarget)
postProcessPollableSource in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>protected org.springframework.integration.support.ErrorMessageStrategy getErrorMessageStrategy()
AbstractMessageChannelBinderErrorMessageStrategy for building error messages; binder
implementations typically might add extra headers to the error message.getErrorMessageStrategy in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>protected org.springframework.messaging.MessageHandler getErrorMessageHandler(ConsumerDestination destination, String group, ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> properties)
AbstractMessageChannelBindergetErrorMessageHandler in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>destination - the destination.group - the group.properties - the properties.protected org.springframework.messaging.MessageHandler getPolledConsumerErrorMessageHandler(ConsumerDestination destination, String group, ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> properties)
AbstractMessageChannelBindergetPolledConsumerErrorMessageHandler in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>destination - the destination.group - the group.properties - the properties.protected org.springframework.kafka.core.ConsumerFactory<?,?> createKafkaConsumerFactory(boolean anonymous,
String consumerGroup,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> consumerProperties)
Copyright © 2018 Pivotal Software, Inc.. All rights reserved.